flask fastapi
for building APIs
flask는 확장성에 문제가 있어, 내부적으로 쓰인다.
multiple request에 대해 Asynchronous Programming
p5-hello-fastapi :
filter /Route and Changes
https://pillow.readthedocs.io/en/stable/
$ git stash $ git checkout p5-hello-fastapi $ conda update --all $ conda install --file requirements.txt
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def index():
return {"hello" : "FastAPI"}
$ uvicorn main:app --reload $ curl -X GET "http://127.0.0.1:8000"
p6-dictionary-fastapi :
filter /Route and Changes
https://pillow.readthedocs.io/en/stable/
$ git stash $ git checkout p6-dictionary-fastapi $ conda update --all $ conda install --file requirements.txt
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def index():
return {"hello" : "FastAPI"}
# dictionary-api-python-flask/model/dbHandler.py
import sqlite3 as SQL
def match_exact(word: str) -> list:
db=SQL.connect("data/dictionary.db")
sql_query="SELECT * FROM entries WHERE word=?"
match = db.execute(sql_query, (word,)).fetchall()
db.close()
return match
def match_like(word: str) -> list:
db=SQL.connect("data/dictionary.db")
sql_query="SELECT * FROM entries WHERE word LIKE ?"
match = db.execute(sql_query, ("%"+word+"%",)).fetchall()
db.close()
return match
from typing import Optional
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from model.dbHandler import match_exact, match_like
app = FastAPI()
@app.get("/")
def index():
response = {"usages":"/dict?="}
return jsonable_encoder(response)
@app.get("/dict")
def dictionary(words:str):
if not words:
response = {"status":"error", "word":words, "data":"word not valid!"}
return jsonable_encoder(response)
definitions = match_exact(words)
if definitions:
response = {"status":"sccess", "word":words, "data":definitions}
return jsonable_encoder(response)
definitions = match_like(words)
if definitions:
response = {"status":"partial", "word":words, "data":definitions}
return jsonable_encoder(response)
else:
response = {"status":"error", "word":words, "data":"word not found"}
return jsonable_encoder(response)
import uvicorn
if __name__=='__main__':
uvicorn.run(app, host='0.0.0.0', port=8000)
s7-filter-fastapi :
$ git stash $ git checkout s7-filter-fastapi #$ conda update --all #$ conda install --file requirements.txt $ conda update -n base -c defaults conda $ conda install -c conda-forge aiohttp
from bin.filters import apply_filter
from typing import List, Optional
from fastapi import FastAPI, File, UploadFile
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
import io
app = FastAPI()
# Read the PIL document to find out which filters are available out-of the box
filters_available = [
"blur",
"contour",
"detail",
"edge_enhance",
"edge_enhance_more",
"emboss",
"find_edges",
"sharpen",
"smooth",
"smooth_more",
]
@app.api_route("/", methods=["GET", "POST"])
def index():
"""
Return the usage instructions that specifies
1. which filters are available, and
2. the method format
"""
response = {
"filters_available": filters_available,
"usage": {"http_method": "POST", "URL": "//"},
}
return jsonable_encoder(response)
@app.post("/{filter}")
def image_filter(filter: str, img: UploadFile = File(...)):
"""
TODO:
1. Checks if the provided filter is available, if not, return an error
2. Check if a file has been provided in the POST request, if not return an error
3. Apply the filter using apply_filter() method from bin.filters
4. Return the filtered image as response
"""
if filter not in filters_available:
response = {"error": "incorrect filter"}
return jsonable_encoder(response)
filtered_image = apply_filter(img.file, filter)
return StreamingResponse(filtered_image, media_type="image/jpeg")
from PIL import Image, ImageFilter
import io
def apply_filter(file: object, filter: str) -> object:
"""
TODO:
1. Accept the image as file object, and the filter type as string
2. Open the as an PIL Image object
3. Apply the filter
4. Convert the PIL Image object to file object
5. Return the file object
"""
image = Image.open(file)
image = image.filter(eval(f"ImageFilter.{filter.upper()}"))
file = io.BytesIO()
image.save(file, "JPEG")
file.seek(0)
return file
ASyncronous
import requests
import aiohttp
import time
import asyncio
URL="http://127.0.0.1:8000/"
start = time.time()
results = []
for i in range(100):
results.append(requests.post(URL).content)
print(f"Time: {time.time() - start}")
# Time: 0.5055480003356934
URLS=[]
for i in range(1000):
URLS.append("http://127.0.0.1:8000")
async def test(URL):
async with aiohttp.ClientSessions() as session:
async with session.post(URL) as resp:
return await resp.text()
start = time.time()
loop = asyncio.get_event_loop()
#results = await asyncio.gather(*[loop.create_task(test(URL)) for URL in URLS], return_exceptions=True)
results = asyncio.gather(*[loop.create_task(test(URL)) for URL in URLS], return_exceptions=True)
print(f"Time: {time.time() - start}")
# Time: 0.0014939308166503906