Mastering Python’s Asyncio: A Practical Guide

asyncio, This toolkit is Python's answer to writing clean, efficient, and scalable code for concurrent I/O operations.

首先,asyncio 是关于编写能够同时处理多任务的代码,但实际上并不是同时进行。它就像厨房里的厨师开始煮炖菜,知道需要时间来慢炖,所以他们在等待的过程中开始准备沙拉,而不是站在那里等待。这就是异步编程的本质——高效地持续运作而不进行不必要的等待。

Event Loop: The central execution device provided by asyncio. It manages and distributes the execution of different tasks. It's responsible for handling events and scheduling asynchronous routines.

Coroutines: Asynchronous functions declared with async def. These functions can be paused and resumed at await points, allowing I/O operations to run in the background. Futures: Objects that represent the result of work that has not yet been completed. They are returned from tasks scheduled by the event loop. Tasks: Scheduled coroutines that are wrapped into a Future object by the event loop, allowing their execution.

Futures: Objects that represent the result of work that has not yet been completed. They are returned from tasks scheduled by the event loop.

Tasks: Scheduled coroutines that are wrapped into a Future object by the event loop, allowing their execution.

await

The await keyword in Python is an essential part of asynchronous programming, introduced in Python 3.5. It is used to pause the execution of an async function until an awaitable object (like coroutines, Tasks, Futures, or I/O) completes, allowing other tasks to run in the meantime. This key feature enables efficient handling of I/O-bound and high-level structured network code.

  • await can only be used inside async functions.
  • Its primary purpose is to yield control back to the event loop, suspending the execution of the enclosing coroutine until the awaited object is resolved. This non-blocking behavior is what makes asynchronous programming efficient, especially for I/O-bound tasks.
  • The objects that can be used with await must be awaitable. The most common awaitables are coroutines declared with async def, but others include asyncio Tasks, Futures, or any object with an await() method.
import asyncio

async def say_hello_async():
    await asyncio.sleep(2)  # Simulates waiting for 2 seconds
    print("Hello, Async World!")

async def do_something_else():
    print("Starting another task...")
    await asyncio.sleep(1)  # Simulates doing something else for 1 second
    print("Finished another task!")

async def main():
    # Schedule both tasks to run concurrently
    await asyncio.gather(
        say_hello_async(),
        do_something_else(),
    )

asyncio.run(main())

Output:

Starting another task...
Finished another task!
Hello, Async World!

aiohttp

Concurrent I/O Tasks, asynchronous HTTP requests

import aiohttp
import asyncio
import time

async def fetch_async(url, session):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        page1 = asyncio.create_task(fetch_async('http://example.com', session))
        page2 = asyncio.create_task(fetch_async('http://example.org', session))
        await asyncio.gather(page1, page2)

start_time = time.time()
asyncio.run(main())
print(f"Done in {time.time() - start_time} seconds")

# Output: Done in 0.2990539073944092 seconds

aiofiles

Asynchronous file operations

Mixing Async and Sync: A Hybrid Approach

Sometimes, you can’t escape synchronous functions but still want to enjoy the async ride. Here’s how you can mix them:

How to integrate synchronous functions within an asynchronous environment using Python’s asyncio library.

import asyncio
import time

def sync_task():
    print("Starting a slow sync task...")
    time.sleep(5)  # Simulating a long task
    print("Finished the slow task.")

async def do_something_else():
    print("Starting another task...")
    await asyncio.sleep(1)  # Simulates doing something else for 1 second
    print("Finished another task!")

async def async_wrapper():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, sync_task)

async def main():
    await asyncio.gather(
        async_wrapper(),
        do_something_else(),
        # Imagine other async tasks here
    )

asyncio.run(main())

Output:

Starting a slow sync task...
Starting another task...
Finished another task!
Finished the slow task.
  • loop.run_in_executor(None, sync_task) schedules sync_task to run in a separate thread or process, depending on the executor used. The default executor (None specified as the first argument) runs tasks in a thread pool.

Why Is This Approach Needed?

  • Integration of Legacy Code: In real-world applications, you often encounter legacy code that is synchronous in nature. Rewriting large codebases for async compatibility is not always feasible. This approach allows you to integrate such code into your async applications seamlessly.

  • Working with Blocking I/O: Some operations, especially those involving blocking I/O, don’t have asynchronous equivalents, or you might be working with third-party libraries that only offer synchronous functions. This technique allows those operations to be offloaded to a thread, freeing the event loop to handle other async tasks.

  • CPU-bound Tasks: Although CPU-bound tasks are usually better handled by multiprocessing due to Python’s Global Interpreter Lock (GIL), you might sometimes choose to run them in threads for simplicity or because the computational overhead is not excessively high. Using run_in_executor allows these tasks to coexist with I/O-bound asynchronous tasks.

The Future() Object

In Python’s asynchronous programming model, a Future is a low-level awaitable object that represents an eventual result of an asynchronous operation. When you create a Future, you're essentially declaring a placeholder for a result that will be available at some point in the future.

Understanding Futures

这里Future和Javascript中Promise很类似, 都有三种状态pending, finished, failed. Python的异步模型和Javascript也基本一致, 就是单线程 + event loop. 在异步上模型上Python和Javascript两者是一样的.

  • Role: Futures are used to bridge low-level asynchronous operations with high-level asyncio applications. They provide a way to manage the state of an asynchronous operation: pending, finished (with a result), or failed (with an exception).
  • Usage: Typically, you don’t need to create Futures yourself when using high-level asyncio functions and constructs (like Tasks, which are a subclass of Future). However, understanding Futures is essential for interfacing with lower-level async APIs or when building complex asynchronous systems.

Working with Futures

A Future object has several key methods and properties:

  • set_result(result): Sets the result of the Future. This will mark it as done and notify all awaiting coroutines.
  • set_exception(exception): Sets an exception as the result of the Future. This also marks it as done but will raise the exception when awaited.
  • add_done_callback(callback): Adds a callback function to be called when the Future is done (either completed with a result or an exception).
  • result(): Returns the result of the Future. If the Future is not done, it will raise an InvalidStateError. If the Future is completed with an exception, this method will re-raise the exception.
  • done(): Returns True if the Future is done. A Future is considered done if it has a result or an exception.
import asyncio

# A function to simulate an asynchronous operation using a Future
async def async_operation(future, data):
    await asyncio.sleep(1)  # Simulate some async work with a delay
    
    # Set the result or exception based on the input data
    if data == "success":
        future.set_result("Operation succeeded")
    else:
        future.set_exception(RuntimeError("Operation failed"))

# A callback function to be called when the Future is done
def future_callback(future):
    try:
        print("Callback:", future.result())  # Attempt to print the result
    except Exception as exc:
        print("Callback:", exc)  # Print the exception if there was one

async def main():
    # Create a Future object
    future = asyncio.Future()
    
    # Add a callback to the Future
    future.add_done_callback(future_callback)
    
    # Start the asynchronous operation and pass the Future
    await async_operation(future, "success")  # Try changing "success" to anything else to simulate a failure
    
    # Check if the Future is done and print its result
    if future.done():
        try:
            print("Main:", future.result())
        except Exception as exc:
            print("Main:", exc)

# Run the main coroutine
asyncio.run(main())

So when to use which?

IO bound problems:

use async if your libraries support it and if not, use threading. 注意所使用的I/O lib是否支持async call, 否则使用threading

CPU bound problems:

use multi-processing.