Async

Published onesixx on

https://docs.python.org/ko/3/glossary.html#term-coroutine, PEP 492참조

https://docs.python.org/ko/3/library/asyncio-task.html#coroutines

https://brownbears.tistory.com/490 

coroutine (코루틴)

Sub-routine의 더 일반화된 형태
asyncio 응용 프로그램을 작성하는 기본 방법
동시성 프로그래밍을 가능케 하도록 만든 개념

이것들은 async def 문으로 구현할 수 있습니다. 

sub-routine vs. co-routine

  • sub-routine은 한 지점에서 진입하고 다른 지점에서 탈출합니다.
  • co-routine은 여러 다른 지점에서 진입하고, 탈출하고, 재개할 수 있습니다.

Thread vs. co-routine

coroutine은 나온지 꽤나 오래된 기술이나 스레드가 나타나면서 방치가 되었었습니다.
이후 스레드의 자원 경쟁, 데드락 등의 문제로 인해 다시 주목받고 있습니다
코루틴은 별도의 Thread없이 메인 Thread상에서 번갈아가며, 병렬처리와 유사한 동작을 수행할 수 있기 때문.

스레드는 비동기로, 여러 스레드가 있다면 한꺼번에 동시에 실행된다.

코루틴은 프로그램이 실행 중일 때, 특정 시점에 코루틴으로 이동하고 그 전에 진행 중이던 루틴은 정지합니다. 즉, 한번에 하나의 코드만 실행됩니다. 이는 기존의 프로그래밍과 유사한 성격으로 보일 수 있습니다.
하지만 기존의 프로그래밍은 에러가 나지 않는 이상 실행 중인 코드를 빠져나올 수 있는 부분은 return과 같이 가장 마지막 부분이지만 코루틴은 실행 중간에 해당 함수를 빠져 나와 다른 코드를 실행할 수 있고 다시 실행 중이였던 코드 라인으로 이동할 수도 있습니다.
다시 말해서 코루틴은 yield 키워드로 실행을 중지할 수 있고, yield 키워드를 호출했던 곳으로 와서 실행을 재시작합니다.

싱글코어에선 코루틴은 이동 시점이 더 잘 조절되고 context switching이 적어, 성능면에서 멀티 스레드보다 좋을 수 있지만 멀티코어 프로세서를 활용할 수 없다는 문제점이 있으며 스레드보다 성능이 떨어지게 됩니다.

coroutine function (코루틴 함수)

co-routine객체를 돌려주는 함수.

코루틴 함수는 async def 문으로 정의될 수 있고,
await 와 async for와 async with 키워드를 포함할 수 있습니다.

asyncio 응용 프로그램을 작성하는 기본 방법

async/await 문법으로 선언된 co-routine

ex) 《hello》를 인쇄하고, 1초 동안 기다린 다음, 《world》를 인쇄

import asyncio


async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(
    main()
)
>>>asyncio.run( main() )
RuntimeError: RuntimeError: asyncio.run() cannot be called from a running event loop
 
>>> main()


>>> await main()

함수는 호출될수 없다. 다른 asyncio 이벤트루프가 같은 thread에 running하고 있는경우.

아마도, Jupyter에서 이벤트루프가 running중.

In your case, jupyter (IPython ≥ 7.0) is already running an event loop:

위 예제는 태스크가 1개이므로 동기 프로그래밍과 크게 다르지 않습니다.
아래는 2개의 태스크를 동기와 비동기로 실행한 코드입니다.

[Python] asyncio 파헤치기 (python 3.8기준)

asyncio란?

https://docs.python.org/ko/3.8/library/asyncio.html

비동기 프로그래밍을 위한 모듈

파이썬 3.5부터 지원

동기란 빨래를 시작하고 종료가 되면 설거지를 시작하고 완료가 되면 TV를 보는 것처럼 한 번에 하나의 작업을 하는 것이고,

비동기는 빨래를 시작시키고 설거지를 하면서 TV를 보는 것과 같이 여러 작업을 동시에 하는 것과 같은 행동을 하는 것입니다.

하지만 파이썬에서는 GIL(Global interpreter Lock) 때문에 비동기 프로그래밍이 동기 프로그래밍보다 느릴 수도 있습니다.

asyncio는 이벤트 루프와 코루틴을 기반으로 동작하며

데이터를 요청하고 응답을 기다리는 I/O bound한 작업에서 효율적입니다.

코루틴 기반이므로 멀티 스레드와 비교하여 문맥교환에 따른 비용이 다소 적게 들어갑니다.

이벤트 루프란?

이벤트 루프는 작업들을 반복문 돌면서 하나씩 실행을 시킵니다.

이때, 실행한 작업이 데이터를 요청하고 응답을 기다린다면 다른 작업에게 event loop에 대한 권한을 넘깁니다.
권한을 받은 event loop는 다음 작업을 실행하고 응답을 받은 순서대로 대기하던 작업부터 다시 권한을 가져와 작업을 마무리합니다.

공식 문서: https://docs.python.org/ko/3.8/library/asyncio-eventloop.html

asyncio

코루틴으로 태스크를 만들었다면, 
asyncio.get_event_loop함수를 활용해 이벤트 루프를 정의하고
run_until_complete으로 실행시킬 수 있습니다.

함수 앞에 async를 붙이면, asyncio를 사용하게 되고, 코루틴이 만들어짐.
I/O가 발생하거나 권한을 다른 작업에 넘기고 싶을 때엔, 해당 로직 앞에 await을 붙입니다.

이때 await 뒤에 오는 코드는 코루틴으로 작성된 코드여야 합니다.
만약 await 뒤에 time.sleep과 같이 사용한다면 스레드가 중단되므로 의도한대로 동작하지 않습니다.
따라서 코루틴으로 구현되어 있는 asyncio.sleep을 사용해야 합니다.
밑에서 설명하겠지만 코루틴으로 만들어진 모듈이 아니라면 (requests, psycopg, django orm 등등) await을 붙여도 소용이 없습니다.

동기

비동기

import asyncio
import time

async def async_task_1():
    print('async_task_1 시작')
    print('sync_task_1 3초 대기')
    await asyncio.sleep(3)
    print('sync_task_1 재시작')

async def async_task_2():
    print('async_task_2 시작')
    print('sync_task_2 2초 대기')
    await asyncio.sleep(2)
    print('sync_task_2 재시작')

async def main():
    start = time.time()
    await async_task_1()
    await async_task_2()
    end = time.time()
    print(f'time taken: {end - start}')

asyncio.run(main())

# async_task_1 시작
# sync_task_1 3초 대기
# sync_task_1 재시작
# async_task_2 시작
# sync_task_2 2초 대기
# sync_task_2 재시작
# time taken: 5.004269123077393

asyncio를 쓰면 더 빠르게 처리되어야 하는데 시간이 동일하게 걸렸습니다. 위와 같이 코루틴 함수 여러개를 한번에 실행해야 하는데 await 코루틴함수() 이런 식으로 나열하면 코루틴을 호출하는 것이지 다음 태스크를 실행하도록 예약하는 행동은 아닙니다. 여러 코루틴을 동시에 실행하여 원하는 동작을하기 위해선 아래와 같이 create_task()로 등록해야 합니다.

import asyncio
import time

async def async_task_1():
    print('async_task_1 시작')
    print('sync_task_1 3초 대기')
    await asyncio.sleep(3)
    print('sync_task_1 재시작')

async def async_task_2():
    print('async_task_2 시작')
    print('sync_task_2 2초 대기')
    await asyncio.sleep(2)
    print('sync_task_2 재시작')

async def main():
    start = time.time()
    task1 = asyncio.create_task(async_task_1())
    task2 = asyncio.create_task(async_task_2())
    await task1
    await task2
    end = time.time()
    print(f'time taken: {end - start}')

asyncio.run(main())

# async_task_1 시작
# sync_task_1 3초 대기
# async_task_2 시작
# sync_task_2 2초 대기
# sync_task_2 재시작
# sync_task_1 재시작
# time taken: 3.0023632049560547

현재 파이썬 3.8을 사용해서 손쉽게 asyncio.run(메인함수())를 사용하는데 이하 버전에서는 이벤트 루프에서 이벤트 루프를 가져온 다음 실행시켜야 합니다. asyncio.run() 함수 내부를 보면 다음과 같습니다.

def run(main, *, debug=None):
    """Execute the coroutine and return the result.

    This function runs the passed coroutine, taking care of
    managing the asyncio event loop and finalizing asynchronous
    generators.

    This function cannot be called when another asyncio event loop is
    running in the same thread.

    If debug is True, the event loop will be run in debug mode.

    This function always creates a new event loop and closes it at the end.
    It should be used as a main entry point for asyncio programs, and should
    ideally only be called once.

    Example:

        async def main():
            await asyncio.sleep(1)
            print('hello')

        asyncio.run(main())
    """
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        if debug is not None:
            loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()

다른 asyncio 이벤트 루프가 동일한 스레드에서 실행 중일 때, asyncio.run() 함수를 호출한다면 에러가 발생합니다. 해당 함수를 호출하면 항상 새 이벤트 루프를 만들고 다 사용한 다음, 이벤트 루프를 종료 시킵니다. 따라서 asyncio 프로그램을 짤 때, 해당 함수는 메인 진입 지점으로 사용하고 1번만 호출하는 것이 좋습니다.

동시에 실행하기

위 예제 중, asyncio.create_task()로 각 awaitable 함수를 등록해서 하는 방법도 있지만 등록해야 하는 함수가 많다면 아래와 같이 asyncio.gather()를 사용하는 것이 좋습니다.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # 동시에 3개를 예약
    result = await asyncio.gather(factorial("A", 2), factorial("B", 3), factorial("C", 4))
    # 또는
    # result = await asyncio.gather(*[factorial("A", 2), factorial("B", 3), factorial("C", 4)])

    print(result)

asyncio.run(main())

# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
# [2, 6, 24]

등록된 모든 코루틴 함수가 성공했다면 등록된 순서대로 결과값이 리스트로 반환됩니다. asyncio.gather() 함수의 세번째 파라미터는 return_exceptions로 기본값은 False이며 등록된 코루틴 함수 중 1개라도 에러가 발생하면 즉시 중단시키고 에러를 발생시킵니다. True로 설정하면 에러가 난 코루틴 함수는 실행되지 않고 결과 리스트에 에러 정보를 담아서 반환합니다.

import asyncio

async def factorial(name, number):
    f = 1
    if number == 4:
        raise ValueError('에러')
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # 동시에 3개를 예약
    result = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
        return_exceptions=True
    )

    print(result)

asyncio.run(main())

# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task B: factorial(3) = 6
# [2, 6, ValueError('에러')]

시간제한두기

만약 코루틴 함수에 시간제한을 두고 싶다면 asyncio.wait_for() 함수를 사용하여 아래와 같이 사용할 수 있습니다.

import asyncio

async def eternity():
    await asyncio.sleep(4)
    print('살았다.')

async def main():
    try:
        # 대기 시간이 1초가 넘어가면 에러처리
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())
# timeout!

해당 함수는 실제로 취소될 때까지 대기하므로 총 대기 시간이 timeout을 넘을 수도 있습니다.

awaitable

await 표현식에서 사용할 수 있는 객체를 awaitable 객체라고 칭합니다. 이러한 객체는 코루틴(coroutine), 태스크(task), 퓨처(future)가 있습니다.

코루틴

코루틴은 awaitable 객체이므로 다른 코루틴에서 호출할 수 있습니다.

async def nested():
    return 42

async def main():
    # 코루틴 함수를 await을 안붙이고 호출하면 호출되지 않음
    # 코루틴은 생성이 되지만 await하지 않음 -> 그래서 아무것도 실행하지 않는다.
    nested()

    # 42 반환됨
    print(await nested()) 

asyncio.run(main())

여기서 코루틴이란 용어는 코루틴 함수와 코루틴 객체라는 두 의미를 내포하는 것을 알 수 있습니다.

  • 코루틴 함수: async def ~~ 로 정의된 함수
  • 코루틴 객체: 코루틴 함수를 호출하고 반환되는 객체

asyncio는 이전 버전인 제너레이터 기반 코루틴도 지원한다고 하지만 3.10부터는 사라질 기술이므로 사용하지 않는 것이 좋아 보입니다.

태스크

태스크는 코루틴을 동시에 예약하는데 사용됩니다. 위 예제에서 사용했던 asyncio.create_task()와 같은 함수를 사용해 코루틴이 실행되도록 자동으로 예약합니다.

async def nested():
    return 42

async def main():
    # nested()함수가 동시에 실행되도록 예약
    task = asyncio.create_task(nested())

    # task 변수를 취소하거나 완료될때까지 대기
    await task

asyncio.run(main())

퓨처

퓨처는 비동기 연산의 최종 결과를 나타내는 저수준 awaitable 객체입니다. 현재까지 asyncio를 사용하는데 퓨처 객체를 어디서 사용해야 되는지 정확하게 이해가 되지 않습니다. asyncio와 퓨처를 사용하는 좋은 예는 loop.run_in_executor() 입니다.

아래 예제는 run_in_executor() 예제인데 보면 알겠지만 asyncio를 사용한 코루틴 동작이 아닌, 멀티 스레드나 멀티 프로세스를 활용한 방식입니다. 코루틴이 아니므로 당연히 문맥 교환 비용이 발생합니다. 장점이라면 일반 함수를 수정하지 않고 비동기로 동작하게끔 만들 수 있습니다.

  • 아직까지 퓨처 객체만 사용해서 스레드나 프로세스를 사용하는 방식보다 asyncio + 퓨처를 사용하는데에 이점이 무엇인지를 모르겠음..
import asyncio
import concurrent.futures

def blocking_io():
    # 로깅과 같은 파일 작업은 이벤트 루프를 차단할 수 있으므로 스레드 풀에서 실행합니다.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU bound 작업은 이벤트 루프를 차단하므로 프로세스 풀에서 실행하는 것이 좋습니다.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    # 첫 번째 인자가 None이면 자체적으로 ThreadPoolExecutor를 생성 - 테스트 했을 때, worker가 41개 등록됐음 
    result = await loop.run_in_executor(None, blocking_io)
    print('default thread pool', result)

    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        print('custom thread pool', result)

    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

관련 문서: https://docs.python.org/ko/3.8/library/asyncio-future.html#asyncio.Future

asyncio를 사용해 API 호출하기

많이 사용하는 requests 모듈은 코루틴 기반으로 만들어진 라이브러리가 아니기 때문에 asyncio를 사용해 API 호출을 하려면 aiohttp 라는 새로운 라이브러리를 사용해야 합니다.

아래는 aiohttp를 사용해 API를 호출한 예시입니다. 아래에 나오는 uri는 경로에 숫자를 입력하면 입력된 숫자(초 단위) 만큼 결과 반환이 지연됩니다.

from time import time
import aiohttp
import asyncio

async def call(session, delay):
    print(f'call {delay}짜리 시작')
    # 전달 받은 session으로 async 처리
    async with session.get(f'http://httpbin.org/delay/{delay}') as response:
        result = await response.json()
        print(f'call {delay}짜리 끝')
        return result

async def main():
    # 타이머 시작
    start = time()

    # 비동기 처리 세션 생성
    async with aiohttp.ClientSession() as session:
        # 작업 동시 예약
        two_task = asyncio.create_task(call(session, 2))
        three_task = asyncio.create_task(call(session, 3))

        # await asyncio.sleep(1)
        # 다른 작업도 가능

        result1 = await two_task
        result2 = await three_task

        end = time()
        print(end - start)

asyncio.run(main())

# call 2짜리 시작
# call 3짜리 시작
# call 2짜리 끝
# call 3짜리 끝
# 3.465127944946289

더 자세한 예시는 https://docs.aiohttp.org/en/stable/에 잘 설명되어 있습니다.

asyncio를 사용해 Database 사용하기

현재 asyncio를 지원하는 라이브러리는 PostgreSQL, MySQL, SQLite 3가지입니다. 여기서 asyncpgaiomysqlaiosqlite 와 같이 raw 쿼리를 작성해서 사용할 수도 있고 이를 한 번 더 감싼 https://github.com/encode/databases 모듈로 간편하게 사용할 수도 있습니다.

아래는 https://github.com/encode/databases 에서 sqlite를 접속하여 사용하는 예시입니다.

# Create a database instance, and connect to it.
from databases import Database
database = Database('sqlite:///example.db')
await database.connect()

# Create a table.
query = """CREATE TABLE HighScores (id INTEGER PRIMARY KEY, name VARCHAR(100), score INTEGER)"""
await database.execute(query=query)

# Insert some data.
query = "INSERT INTO HighScores(name, score) VALUES (:name, :score)"
values = [
    {"name": "Daisy", "score": 92},
    {"name": "Neil", "score": 87},
    {"name": "Carol", "score": 43},
]
await database.execute_many(query=query, values=values)

# Run a database query.
query = "SELECT * FROM HighScores"
rows = await database.fetch_all(query=query)
print('High Scores:', rows)

좋아요7공유하기글 요소

Categories: Python Basic

onesixx

Blog Owner

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x