asyncio python
- History
- greenlet
- yield
- python3.4 asyncio
yield from asyncio.sleep(1)
yield
allow switch to other coroutines if blocked on IO
-
python3.5 (async, await), 3.7(run)
- await, async
-
Asynchronous Programming
-
event loop
- Basic of coroutines - {:height 181, :width 346} - async - coroutine function and coroutine object- await - If Python encounters anasync def cotask(): # a coroutine task yield from asyncio.sleep(1) result = cotask() # a coroutine object asyncio.get_event_loop().run_until_complete(cotask()) # run the task asyncio.run(cotask()) # python 3.7
await f()
expression in the scope ofg()
,await
tells the event loop, “Suspend execution ofg()
until whatever I’m waiting on —the result off()
—is returned. In the meantime, go let something else run.” - Waitable objects can be: - coroutine object - ((6512ae22-02c0-42c1-bb30-97094aae783d)) - Task object - Two coroutines depends on each other- Future - Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled. A Future can be awaited multiple times and the result is same. - Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports) to interoperate with high-level async/await code. - exampleimport asyncio async def others() await async.sleep(1) async def others1() await async.sleep(1) async def dep_others(): await others() await other1() asyncio.run(dep_others())
- Set a futureasync def main(): loop = asyncio.get_running_loop() # current evloop future = loop.create_future() # an empty future object/task await future # wait until future object/task finished in this case it will wait for ever asyncio.run(main())
- concurrent.futures - Theasync def set_future(future): await asyncio.sleep(1) future.set_result("666") async def main(): loop = asyncio.get_running_loop() # current evloop future = loop.create_future() # an empty future object/task await loop.create_task(set_future(future)) data = await future # wait until future object/task finished in this case it will wait for ever print(data) # 666 asyncio.run(main())
concurrent.futures
module is part of the Python standard library starting from Python 3.2. It provides a high-level interface for asynchronously executing callable objects using threads or processes. It builds upon the concepts of threading and multiprocessing but offers a simpler, more abstracted interface. ThreadPoolExecutor example```python import concurrent.futures # Define a simple function to be executed def task(n): return n ** 2 # Create a ThreadPoolExecutor with maximum 2 worker threads with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: # Submit tasks to the executor results = [executor.submit(task, i) for i in range(5)] # Retrieve results as they become available for future in concurrent.futures.as_completed(results): result = future.result() print(result) # Create a ProcessPoolExecutor with maximum 2 worker processes with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: # Submit tasks to the executor results = [executor.submit(task, i) for i in range(5)] # Retrieve results as they become available for future in concurrent.futures.as_completed(results): result = future.result() print(result) ```
-
asyncio object are not thread safe and it should not use together with multithread object. Here is a hack to convert a concurrent future to a asyncio feature
reference: Executing code in thread or process pools -import requests # doesnt support asyncio feature import asyncio async def download(url): loop = asyncio.get_event_loop() feautre=loop.run_in_executor(None, request.get, url) response = await future print(response) task = [download(url) for url in rul_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
run_in_executor
- if a io request does not support asyncio API. It can be wrapped withrun_in_executor
- Above create a task The executor argument should be an concurrent.futures.Executor instance. The default executor is used if executor is None. - - Tasks - Syntax
create_task
is equal to create+scheduleToRun. You do not need to kickoff the task - The methodcreate_task
takes a coroutine object as a parameter and returns aTask
object, which inherits fromasyncio.Future
. The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine’s code-block. The returned future will be marked asdone()
only when the task has finished execution. As you might expect the return value of the coroutine’s code block is theresult()
which will be stored in the future object when it is finished (and if it raises then the exception will be caught and stored in the future). - Example:- Video resource {{video(https://www.bilibili.com/video/BV1NA411g7yf?p=7&vd_source=3beef1bd86c86cf14f277319e599dab9)}} - Async Queue - Asynchronous Iterators - ((65137d16-a06a-40e4-b28e-5fa4474017d5)) - Sampleimport asyncio async def counter(name: str): for i in range(0, 100): print(f"{name}: {i!s}") await asyncio.sleep(0) async def main(): tasks = [] for n in range(0, 4): tasks.append(asyncio.create_task(counter(f"task{n}"))) while True: tasks = [t for t in tasks if not t.done()] if len(tasks) == 0: return await tasks[0] #another way to wait done, pending = await asyncio.wait(tasks, timeout=None) # and you can also try this asyncio.run(asyncio.wait([counter('a'), counter('b')])) asyncio.run(main())
- Asynchronous contex manager - async ctx mgr provide a context manager that can be suspended when entering and exiting. This is achieved by usingclass Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i async def ticker(delay, to): """Yield numbers from 0 to `to` every `delay` seconds.""" for i in range(to): yield i await asyncio.sleep(delay)
async with
. It is same way aswith
been used in other python expressions (e.g. file open) - How to use- It must be used inside a coroutine# create and use an asynchronous context manager async with AsyncContextManager() as manager: ... # Same as: # create or enter the async context manager manager = await AsyncContextManager() try: # ... finally: # close or exit the context manager await manager.close()
async def
- uvloop - uvloop GitHub - MagicStack/uvloop: Ultra fast asyncio event loop. is 2~3 times faster than asyncio - Using uvloop- It used by asgi uvicorn - Practical examples - redisimport asyncio import sys import uvloop async def main(): # Main entry-point. ... if sys.version_info >= (3, 11): with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner: runner.run(main()) else: uvloop.install() asyncio.run(main())
- Mysql with aiomysqlimport redis.asyncio as redis r = await redis.from_url("redis://localhost") async with r.pipeline(transaction=True) as pipe: ok1, ok2 = await (pipe.set("key1", "value1").set("key2", "value2").execute()) assert ok1 assert ok2 # pubsub import asyncio import redis.asyncio as redis STOPWORD = "STOP" async def reader(channel: redis.client.PubSub): while True: message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: print(f"(Reader) Message Received: {message}") if message["data"].decode() == STOPWORD: print("(Reader) STOP") break r = redis.from_url("redis://localhost") async with r.pubsub() as pubsub: await pubsub.subscribe("channel:1", "channel:2") future = asyncio.create_task(reader(pubsub)) await r.publish("channel:1", "Hello") await r.publish("channel:2", "World") await r.publish("channel:1", STOPWORD) await future import asyncio_redis @asyncio.coroutine def example(): # Create Redis connection connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379) # Set a key yield from connection.set('my_key', 'my_value') # When finished, close the connection. connection.close() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(example())
¶import asyncio import sqlalchemy as sa from aiomysql.sa import create_engine metadata = sa.MetaData() tbl = sa.Table('tbl', metadata, sa.Column('id', sa.Integer, primary_key=True), sa.Column('val', sa.String(255))) async def go(loop): engine = await create_engine(user='root', db='test_pymysql', host='127.0.0.1', password='', loop=loop) async with engine.acquire() as conn: await conn.execute(tbl.insert().values(val='abc')) await conn.execute(tbl.insert().values(val='xyz')) async for row in conn.execute(tbl.select()): print(row.id, row.val) engine.close() await engine.wait_closed() loop = asyncio.get_event_loop() loop.run_until_complete(go(loop))
-¶
-¶
- - References - 协程到底是咋回事?asyncio大佬给你彻底讲明白。 - Python Asyncio Part 2 – Awaitables, Tasks, and Futures | cloudfit-public-docs - Python Asyncio Part 3 – Asynchronous Context Managers and Asynchronous Iterators | cloudfit-public-docs - Async IO in Python: A Complete Walkthrough – Real Python - -
-