異步是 Python 3.4 起引入的重大變革,透過 asyncio 模組與新的 async、await 語法達到與 JavaScript 類似的異步執行效果,這篇寫一下異步的用法與特性。
一個最簡單的異步程式例子:
import asyncio
async def main():
print('hello')
await asyncio.sleep(10)
print('world')
if __name__ == '__main__':
asyncio.run(main())在上面的例子內有幾個 Python 異步程式的特徵:
- 由
async關鍵字定義main()為異步函式。 - 函式內有
await關鍵字的敘述句表示該敘述為異步執行。 await後呼叫的函式asyncio.sleep()為一個具有 awaitable 的函式,對await關鍵字來說,後面呼叫的函式有 awaitable 屬性是必須的,否則會引發錯誤。asyncio.run()負責把main()丟進 event loop 執行。
協程與 event loop
如果我們直接跑那個 async 定義的 main(),Python 會告訴我們此函式它生成一個 coroutine(協程)物件,並且因為沒有 asyncio 模組幫忙把它丟給 event loop,此時函式不會被執行:
main()
<coroutine object main at 0x1053bb7c8>從這裡我們可以瞭解到,async 函式會生成 coroutine(協程)物件,它們是反應物與生成物的關係,口語上,我們比較常講「async 函式」,實際上,在 event loop 中被執行的是生成的 coroutine 物件,書面上或口語上可以把它們等而視之。
Event loop 接受並調度一系列的 async 函式(嚴謹的說是協程物件)。不像上面簡化的例子,在真實的場景中,往往有多個工作需要並行前進,例如某個 API 後端,要一邊持續收 message queue 訊息,一邊要回應客戶端請求,又例如要並行下載多個檔案等等,這類涉及網路 IO 與磁碟 IO 的任務統稱為 IO-bound 任務,IO-bound 特性是 IO 存取頻繁,而 CPU 運算強度不高,如同 asyncio 名字所表示,它特別適合處理 IO-bound 類並行任務,反之如果是 CPU-bound 或 GPU-bound 這類運算密集型任務,例如算圖、音訊、視訊、影像處理,則比較適合用多進程解決,發揮 CPU 多核心的特長,或是調用 GPU 運算函式庫處理,當然也有某些應用場景可能既是 IO-bound 又是 CPU-bound 也是 GPU-bound,那它就會同時用上多進程多線程 asyncio + CUDA。
Python event loop 的一項特性是一個線程只能跑一個 event loop,event loop 不能跨線程,但是如上段結尾所提,我可以開很多個線程,每個線程都跑自己的 event loop,當然,要玩多進程又多線程還要跑 asyncio / event loop,就很考驗我們的調度能力了。
async / await
回到上文程式範例,event loop 會自行調度每個協程任務,無需人為介入,執行 await asyncio.sleep(10) 語句時,如果在這個 event loop 中還有其他協程任務時,睡著的這十秒並不會阻塞其他協程任務進行,在睡覺期間,event loop 會叫起別的協程任務運行,如此達到並行運算的效果。
然而因為 await 的關係,Python 會確保下一句 print('world') 在睡完後再執行,因為我們還是要求程式是有序執行的嘛,如果程式主要邏輯變成這樣:
await asyncio.sleep(10)
await asyncio.sleep(10)那 Python 會睡幾秒?答案是二十秒,它們並沒有「並行睡十秒」,這似乎和前文說的並行運算有所違背,癥結就在那個 await,它保證了程式執行的有序性,問題來了,既然有序,又如何並行?
TaskGroup
從 Python 3.11 起,對於並行協程,引入了 task group 方案,在此之前,什麼 asyncio.create_task()、asyncio.gather() 等等的招式在此不提,task group 跑並行協程範例如下,這次我們用比較實際的(假裝)並行下載示範:
import asyncio
async def download(url: str, sleep_duration: int):
await asyncio.sleep(sleep_duration)
print(url)
return url
url1 = 'task1'
url2 = 'task2'
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(download(url1, 10))
task2 = tg.create_task(download(url2, 3))
task1_result = await task1
task2_result = await task2
print('All done')
return (task1_result, task2_result)
if __name__ == '__main__':
asyncio.run(main())輸出為:
task2
task1
All done並且此時總時長為 10 秒,不是 13 秒。
來認識一下 task group:
async with xxx啟動了異步的 context,而asyncio.TaskGroup()本身就是異步 context 的生成器,它負責實作異步 context 要求的aenter()與aexit(),如同一般的 context 生成器一樣,只不過前綴了a表示 async,異步 context 裡面理所當然的可以執行await xxx()。task1 = tg.create_task(download(url1, 10))這句會生成一個Task物件,這也是 event loop 的管理對象,一個 coroutine 可以顯式或隱式的被封裝成Task物件,然後交由 event loop 管理與執行,這裡我們需要關注的是Task物件一旦生成就立刻被執行,因此task1與task2兩者幾乎是同時執行,也就是並行。await task1用於取得task1的回傳值,如上點所說task1本身不是函式,它是個Task物件,所以是await task1,後面不用加括號。await xxx在這裡不影響兩個任務的進行順序,如上點所說,它們在各自建立為Task物件時就已經執行了,所以我們看到先輸出的是只睡三秒的 task2 而不是先await的 task1。
Task 是 asyncio 的一個類,顧名思義,就是任務。前面定義的異步函式、coroutine 物件都需要被顯式的或隱式的轉換成 Task 物件才能交給 event loop 去運行,Task 物件可以是一份任務,或者是 N 份任務的清單。
透過 create_task() 去運行多個任務,就可以做到並行的效果。
再舉個實際例子,某個專案要訂閱 MQTT 訊息,把訊息轉入 MongoDB job queue,又要跑 job worker,還要透過 ZeroMQ 把 job 處理結果丟出去,這麼多事摻在一起,至少有兩個無限迴圈要跑:
- 監看 queue job 的 job worker
- 監聽 MQTT 訊息的訂閱器
此專案啟動範例如下:
import mongodb
import mqtt
import zeromq
async def main():
# 先連接到 MongoDB
await mongodb.initialize()
# 再綁定 ZeroMQ
await zeromq.bind_address()
async with asyncio.TaskGroup() as tg:
# 跑 MongoDB job queue worker
tg.create_task(job_queue.run())
# 監聽 MQTT 訂閱訊息
tg.create_task(mqtt.listen())
if __name__ == '__main__':
asyncio.run(main())主函式裡面透過 await xxx 確保了會先連線到 MongoDB 與綁定 ZeroMQ,才會去跑後面的 task group,而 task group 內跑 job worker 與接收 MQTT 訊息則是並行發生的,如此保證了該有序執行的有序執行,該並行運行的並行運行。
結語
其實用上 asyncio 很多時候是因為被套件所逼,一旦某套件說它得異步調用,那我們就得一路 async / await 下去,而 asyncio 對 IO-bound 工作究竟有沒有神奇的功效,目前說法莫衷一是,畢竟每個評價方式都不一樣,無論如何 asyncio 還是要用上的,等 Python GIL 封印解除,就可以到達提速五倍的境界了吧。

