異步是 Python 3.4 起引入的重大變革,透過 asyncio 模組與新的 asyncawait 語法達到與 JavaScript 類似的異步執行效果,這篇寫一下異步的用法與特性。

一個最簡單的異步程式例子:

import asyncio

async def main():
    print('hello')
    await asyncio.sleep(10)
    print('world')

if __name__ == '__main__':
    asyncio.run(main())

在上面的例子內有幾個 Python 異步程式的特徵:

  • async 關鍵字定義 main() 為異步函式。
  • 函式內有 await 關鍵字的敘述句表示該敘述為異步執行。
  • await 後呼叫的函式 asyncio.sleep() 為一個具有 awaitable 的函式,對 await 關鍵字來說,後面呼叫的函式有 awaitable 屬性是必須的,否則會引發錯誤。
  • asyncio.run() 負責把 main() 丟進 event loop 執行。

協程與 event loop

如果我們直接跑那個 async 定義的 main(),Python 會告訴我們此函式它生成一個 coroutine(協程)物件,並且因為沒有 asyncio 模組幫忙把它丟給 event loop,此時函式不會被執行:

main()
<coroutine object main at 0x1053bb7c8>

從這裡我們可以瞭解到,async 函式會生成 coroutine(協程)物件,它們是反應物與生成物的關係,口語上,我們比較常講「async 函式」,實際上,在 event loop 中被執行的是生成的 coroutine 物件,書面上或口語上可以把它們等而視之。

Event loop 接受並調度一系列的 async 函式(嚴謹的說是協程物件)。不像上面簡化的例子,在真實的場景中,往往有多個工作需要並行前進,例如某個 API 後端,要一邊持續收 message queue 訊息,一邊要回應客戶端請求,又例如要並行下載多個檔案等等,這類涉及網路 IO 與磁碟 IO 的任務統稱為 IO-bound 任務,IO-bound 特性是 IO 存取頻繁,而 CPU 運算強度不高,如同 asyncio 名字所表示,它特別適合處理 IO-bound 類並行任務,反之如果是 CPU-bound 或 GPU-bound 這類運算密集型任務,例如算圖、音訊、視訊、影像處理,則比較適合用多進程解決,發揮 CPU 多核心的特長,或是調用 GPU 運算函式庫處理,當然也有某些應用場景可能既是 IO-bound 又是 CPU-bound 也是 GPU-bound,那它就會同時用上多進程多線程 asyncio + CUDA。

Python event loop 的一項特性是一個線程只能跑一個 event loop,event loop 不能跨線程,但是如上段結尾所提,我可以開很多個線程,每個線程都跑自己的 event loop,當然,要玩多進程又多線程還要跑 asyncio / event loop,就很考驗我們的調度能力了。

async / await

回到上文程式範例,event loop 會自行調度每個協程任務,無需人為介入,執行 await asyncio.sleep(10) 語句時,如果在這個 event loop 中還有其他協程任務時,睡著的這十秒並不會阻塞其他協程任務進行,在睡覺期間,event loop 會叫起別的協程任務運行,如此達到並行運算的效果。

然而因為 await 的關係,Python 會確保下一句 print('world') 在睡完後再執行,因為我們還是要求程式是有序執行的嘛,如果程式主要邏輯變成這樣:

await asyncio.sleep(10)
await asyncio.sleep(10)

那 Python 會睡幾秒?答案是二十秒,它們並沒有「並行睡十秒」,這似乎和前文說的並行運算有所違背,癥結就在那個 await,它保證了程式執行的有序性,問題來了,既然有序,又如何並行?

TaskGroup

從 Python 3.11 起,對於並行協程,引入了 task group 方案,在此之前,什麼 asyncio.create_task()asyncio.gather() 等等的招式在此不提,task group 跑並行協程範例如下,這次我們用比較實際的(假裝)並行下載示範:

import asyncio

async def download(url: str, sleep_duration: int):
  await asyncio.sleep(sleep_duration)
  print(url)
  return url

url1 = 'task1'
url2 = 'task2'

async def main():
  async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(download(url1, 10))
    task2 = tg.create_task(download(url2, 3))
    task1_result = await task1
    task2_result = await task2
    print('All done')
  return (task1_result, task2_result)

if __name__ == '__main__':
    asyncio.run(main())

輸出為:

task2
task1
All done

並且此時總時長為 10 秒,不是 13 秒。

來認識一下 task group:

  • async with xxx 啟動了異步的 context,而 asyncio.TaskGroup() 本身就是異步 context 的生成器,它負責實作異步 context 要求的 aenter()aexit(),如同一般的 context 生成器一樣,只不過前綴了 a 表示 async,異步 context 裡面理所當然的可以執行 await xxx()

  • task1 = tg.create_task(download(url1, 10)) 這句會生成一個 Task 物件,這也是 event loop 的管理對象,一個 coroutine 可以顯式或隱式的被封裝成 Task 物件,然後交由 event loop 管理與執行,這裡我們需要關注的是 Task 物件一旦生成就立刻被執行,因此 task1task2 兩者幾乎是同時執行,也就是並行。

  • await task1 用於取得 task1 的回傳值,如上點所說 task1 本身不是函式,它是個 Task 物件,所以是 await task1,後面不用加括號。await xxx 在這裡不影響兩個任務的進行順序,如上點所說,它們在各自建立為 Task 物件時就已經執行了,所以我們看到先輸出的是只睡三秒的 task2 而不是先 await 的 task1。

Task 是 asyncio 的一個類,顧名思義,就是任務。前面定義的異步函式、coroutine 物件都需要被顯式的或隱式的轉換成 Task 物件才能交給 event loop 去運行,Task 物件可以是一份任務,或者是 N 份任務的清單。

透過 create_task() 去運行多個任務,就可以做到並行的效果。

再舉個實際例子,某個專案要訂閱 MQTT 訊息,把訊息轉入 MongoDB job queue,又要跑 job worker,還要透過 ZeroMQ 把 job 處理結果丟出去,這麼多事摻在一起,至少有兩個無限迴圈要跑:

  • 監看 queue job 的 job worker
  • 監聽 MQTT 訊息的訂閱器

此專案啟動範例如下:

import mongodb
import mqtt
import zeromq

async def main():
  # 先連接到 MongoDB
  await mongodb.initialize()
        
  # 再綁定 ZeroMQ
  await zeromq.bind_address()

  async with asyncio.TaskGroup() as tg:
    # 跑 MongoDB job queue worker
    tg.create_task(job_queue.run())
    
    # 監聽 MQTT 訂閱訊息
    tg.create_task(mqtt.listen())

if __name__ == '__main__':
    asyncio.run(main())

主函式裡面透過 await xxx 確保了會先連線到 MongoDB 與綁定 ZeroMQ,才會去跑後面的 task group,而 task group 內跑 job worker 與接收 MQTT 訊息則是並行發生的,如此保證了該有序執行的有序執行,該並行運行的並行運行。

結語

其實用上 asyncio 很多時候是因為被套件所逼,一旦某套件說它得異步調用,那我們就得一路 async / await 下去,而 asyncio 對 IO-bound 工作究竟有沒有神奇的功效,目前說法莫衷一是,畢竟每個評價方式都不一樣,無論如何 asyncio 還是要用上的,等 Python GIL 封印解除,就可以到達提速五倍的境界了吧。