flask fastapi
for building APIs
flask는 확장성에 문제가 있어, 내부적으로 쓰인다.
multiple request에 대해 Asynchronous Programming
p5-hello-fastapi :
filter /Route and Changes
https://pillow.readthedocs.io/en/stable/
$ git stash $ git checkout p5-hello-fastapi $ conda update --all $ conda install --file requirements.txt
from fastapi import FastAPI app = FastAPI() @app.get("/") def index(): return {"hello" : "FastAPI"}
$ uvicorn main:app --reload $ curl -X GET "http://127.0.0.1:8000"
p6-dictionary-fastapi :
filter /Route and Changes
https://pillow.readthedocs.io/en/stable/
$ git stash $ git checkout p6-dictionary-fastapi $ conda update --all $ conda install --file requirements.txt
from fastapi import FastAPI app = FastAPI() @app.get("/") def index(): return {"hello" : "FastAPI"}
# dictionary-api-python-flask/model/dbHandler.py import sqlite3 as SQL def match_exact(word: str) -> list: db=SQL.connect("data/dictionary.db") sql_query="SELECT * FROM entries WHERE word=?" match = db.execute(sql_query, (word,)).fetchall() db.close() return match def match_like(word: str) -> list: db=SQL.connect("data/dictionary.db") sql_query="SELECT * FROM entries WHERE word LIKE ?" match = db.execute(sql_query, ("%"+word+"%",)).fetchall() db.close() return match
from typing import Optional from fastapi import FastAPI from fastapi.encoders import jsonable_encoder from model.dbHandler import match_exact, match_like app = FastAPI() @app.get("/") def index(): response = {"usages":"/dict?="} return jsonable_encoder(response) @app.get("/dict") def dictionary(words:str): if not words: response = {"status":"error", "word":words, "data":"word not valid!"} return jsonable_encoder(response) definitions = match_exact(words) if definitions: response = {"status":"sccess", "word":words, "data":definitions} return jsonable_encoder(response) definitions = match_like(words) if definitions: response = {"status":"partial", "word":words, "data":definitions} return jsonable_encoder(response) else: response = {"status":"error", "word":words, "data":"word not found"} return jsonable_encoder(response) import uvicorn if __name__=='__main__': uvicorn.run(app, host='0.0.0.0', port=8000)
s7-filter-fastapi :
$ git stash $ git checkout s7-filter-fastapi #$ conda update --all #$ conda install --file requirements.txt $ conda update -n base -c defaults conda $ conda install -c conda-forge aiohttp
from bin.filters import apply_filter from typing import List, Optional from fastapi import FastAPI, File, UploadFile from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse import io app = FastAPI() # Read the PIL document to find out which filters are available out-of the box filters_available = [ "blur", "contour", "detail", "edge_enhance", "edge_enhance_more", "emboss", "find_edges", "sharpen", "smooth", "smooth_more", ] @app.api_route("/", methods=["GET", "POST"]) def index(): """ Return the usage instructions that specifies 1. which filters are available, and 2. the method format """ response = { "filters_available": filters_available, "usage": {"http_method": "POST", "URL": "//"}, } return jsonable_encoder(response) @app.post("/{filter}") def image_filter(filter: str, img: UploadFile = File(...)): """ TODO: 1. Checks if the provided filter is available, if not, return an error 2. Check if a file has been provided in the POST request, if not return an error 3. Apply the filter using apply_filter() method from bin.filters 4. Return the filtered image as response """ if filter not in filters_available: response = {"error": "incorrect filter"} return jsonable_encoder(response) filtered_image = apply_filter(img.file, filter) return StreamingResponse(filtered_image, media_type="image/jpeg")
from PIL import Image, ImageFilter import io def apply_filter(file: object, filter: str) -> object: """ TODO: 1. Accept the image as file object, and the filter type as string 2. Open the as an PIL Image object 3. Apply the filter 4. Convert the PIL Image object to file object 5. Return the file object """ image = Image.open(file) image = image.filter(eval(f"ImageFilter.{filter.upper()}")) file = io.BytesIO() image.save(file, "JPEG") file.seek(0) return file
ASyncronous
import requests import aiohttp import time import asyncio URL="http://127.0.0.1:8000/" start = time.time() results = [] for i in range(100): results.append(requests.post(URL).content) print(f"Time: {time.time() - start}") # Time: 0.5055480003356934
URLS=[] for i in range(1000): URLS.append("http://127.0.0.1:8000") async def test(URL): async with aiohttp.ClientSessions() as session: async with session.post(URL) as resp: return await resp.text() start = time.time() loop = asyncio.get_event_loop() #results = await asyncio.gather(*[loop.create_task(test(URL)) for URL in URLS], return_exceptions=True) results = asyncio.gather(*[loop.create_task(test(URL)) for URL in URLS], return_exceptions=True) print(f"Time: {time.time() - start}") # Time: 0.0014939308166503906