非同步程式 async / await#

asyncawait 是 Python 3.5 以後引入的語法,用來寫 非同步(asynchronous)程式 。核心概念是:當程式要「等待外部事件」(網路、磁碟、資料庫)時,不要乾等,讓出 CPU 給別的任務用


為什麼要非同步#

用一個最小範例對照同步與非同步寫法——兩個任務 A / B 分別要等 3、2 秒(模擬網路、磁碟等外部 IO)。

同步版本#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import time

def fetch(name, seconds):
    print(f"{name} 開始")
    time.sleep(seconds)              # 模擬等網路
    print(f"{name} 完成")
    return name

t0 = time.time()
results = [fetch("A", 3), fetch("B", 2)]
print(results)
print(time.time() - t0)              # ≈ 5 秒

每個 fetch() 必須跑完才能換下一個,期間 CPU 完全閒置。總時間 3+2 = 5 秒 線性相加。

非同步版本#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio, time

async def fetch(name, seconds):
    print(f"{name} 開始")
    await asyncio.sleep(seconds)     # 模擬等網路
    print(f"{name} 完成")
    return name

async def main():
    results = await asyncio.gather(
        fetch("A", 3),
        fetch("B", 2),
    )
    print(results)

t0 = time.time()
asyncio.run(main())
print(time.time() - t0)              # ≈ 3 秒

兩個任務幾乎同時送出,誰先等完就先結束 ,總時間接近「最慢的那一個」(A 的 3 秒)而非全部加總。

執行結果:

A 開始
B 開始
B 完成
A 完成
['A', 'B']

兩個 開始 在同一瞬間印出——證明任務是並行等待 ,不是排隊執行;接著按等待秒數由短到長依序 完成

對照#

版本寫法總耗時關鍵差別
同步time.sleep + 循序呼叫3+2 = 5 秒一個做完才換下一個
非同步asyncio.sleep + asyncio.gather≈ 3 秒兩個同時等、誰先完成誰先結束

接下來先用時序圖整體看一次非同步流程,再依序拆解每個元件。


時序圖:協程與事件迴圈#

非同步程式跑起來的整體樣貌可以用時序圖看一次。下圖以前一節的範例為例(maingather 並行 A / B),呈現事件迴圈和協程之間的互動:

      事件迴圈                                       協程
         │
         │ ◀── ① asyncio.run(main())
         │       建立新事件迴圈、main 包成 Task 加進待辦
         │
         │ ── ② 啟動事件迴圈,執行 main task ─────────▶ main: running
         │                                             │
         │                                            await asyncio.gather(fetch("A", 3), fetch("B", 2))
         │                                              (gather 先把兩者包成 Task 加入待辦清單)
         │ ◀── ③ 把控制權交回事件迴圈 ──────────────────
         │
         │ ── ④ 從待辦挑 A 執行 ────────────────────▶ A: running
         │                                            │
         │                                            await asyncio.sleep(3)
         │ ◀── ⑤ 等 sleep,把控制權交回事件迴圈 ──────────
         │
         │ ── ⑥ 從待辦挑 B 執行 ────────────────────▶ B: running
         │                                            │
         │                                            await asyncio.sleep(2)
         │ ◀── ⑦ 等 sleep,把控制權交回事件迴圈 ──────────
         │		 
         │   ** 三個任務都在等: 事件迴圈閒置,等最早到期的計時器 ** 
         │
         │ ── ⑧ B 的 sleep(2) 到期,喚醒 B ─────────▶ B: 從 await 後繼續 → return
         │
         │ ── ⑨ A 的 sleep(3) 到期,喚醒 A ─────────▶ A: 從 await 後繼續 → return
         │
         │       A、B 完成,gather 解除 main 的等待
         │ ── ⑩ 喚醒 main ─────────────────────────▶ main: 從 await 後繼續 → return(main() 結束)
         │
         │ ── ⑪ asyncio.run(main()) 結束事件迴圈

圖裡有三類元件,後面三節依序拆解:

  • 事件迴圈 ——左側時間軸;由 asyncio.run 啟動(步驟 ①)
  • Task / 協程 ——右側 main / A / B ;Task 是「事件迴圈待辦清單上的協程」(下節詳述),gather 把協程包成 Task 加入待辦(③)
  • await ——協程主動把控制權交回事件迴圈的時機(⑤、⑦)

其他步驟(②、④、⑥、⑧、⑨、⑩、⑪)都是事件迴圈自動處理的內部切換——只挑 已經準備好(等待條件已滿足) 的協程,沒人準備好就閒置等到期;待辦清空後 asyncio.run 收尾關閉迴圈。


事件迴圈(event loop)#

事件迴圈(event loop)是整個 asyncio 的核心。官方文件指揮家 來形容它——指揮家本身不演奏樂器,但負責決定下一位演奏者是誰、什麼時候換人。

事件迴圈的運作方式:

  • 維護一份 待辦清單(a collection of jobs)
  • 從清單挑一個協程,把控制權交給它
  • 協程跑到暫停點(例如 await 等 IO)或執行完畢時,控制權回到事件迴圈
  • 事件迴圈再從清單挑下一個準備好的協程
  • 清單清空時事件迴圈會休息,不會空轉耗 CPU

這是一種 合作式多工(cooperative multitasking) :每個協程必須「懂事地讓位」,不能霸佔事件迴圈太久,否則其他協程會被餓死(starved)。這和作業系統的搶占式多工(preemptive)不同——後者由 OS 強制切換,協程這邊則是自願讓出。

還要特別強調:整個過程只有一個執行緒、一個 CPU 核心 ,不是多執行緒、也不是多核心平行運算。純粹靠「不乾等」擠出效率——因此非同步適合 IO 密集(等網路、等磁碟),不適合 CPU 密集(運算本身沒有等待可以讓出)。

asyncio.run(coro):啟動事件迴圈的入口#

事件迴圈本身不會自己跑起來,必須由 asyncio.run 啟動——對應時序圖的步驟 ①。

asyncio.run(coro) 會做四件事:

  1. 建立一個新的事件迴圈
  2. coro 包成 main task 排進待辦
  3. 執行事件迴圈直到 main task 完成
  4. 關閉事件迴圈
1
2
3
4
async def main():
    print("hello")

asyncio.run(main())

整個程式從頭到尾只會呼叫一次 asyncio.run ,作為非同步世界的進入點。


協程(coroutine)與Task#

協程函式與協程物件#

協程函式是一種可以暫停和恢復的函式 。一般函式一旦呼叫就會從頭跑到結束,中途無法中斷;協程則可以在某個點「暫停,先讓別人跑」,之後再從暫停處繼續:

1
2
3
4
5
6
7
8
def normal():
    step_1()
    step_2()           # 一定接著 step_1 跑,中間不會讓出 CPU

async def coro():
    step_1()
    await something()  # 到這裡會暫停,CPU 讓給別的協程
    step_2()           # 別人跑完、自己等到了,就從這裡恢復

這個「可暫停、可恢復」的能力,正是非同步行為成立的關鍵。

  • async def 宣告 協程函式(coroutine function)
  • 呼叫協程函式不會執行函式內容 ,只會回傳一個 協程物件(coroutine object),必須把它放進事件迴圈的待辦清單才會執行。
  • 協程物件是 awaitable ,用來接在 await 後面(〈await〉一節整理)
1
2
3
4
5
6
7
8
async def hello():  # 宣告 協程函式
    print("hi")

print(hello)        # <function hello at 0x...>      ← 協程函式(不是 awaitable)
coro = hello()      # ← 協程物件(這個才是 awaitable)
print(type(coro))   #   <class 'coroutine'>
print(coro)         #   <coroutine object hello at 0x...>
# 注意:函式裡的 print("hi") 還沒跑!

Task:綁定到事件迴圈的協程#

官方文件 對 Task 的定義是:

Roughly speaking, tasks are coroutines (not coroutine functions) tied to an event loop.

換句話說,Task 就是「已經綁定到事件迴圈、進入待辦清單」的協程——協程物件本身只是函式邏輯,要被綁成 Task 才會真正被事件迴圈排程執行。

產生 Task 有兩種方式:

asyncio.create_task(coro):啟動背景任務#

適合「邊跑邊動態新增任務」,呼叫的當下就把協程排進待辦,回傳 Task 物件可以稍後 await

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async def worker(i):
    await asyncio.sleep(1)
    print(f"任務 {i} 完成")

async def main():
    tasks = [asyncio.create_task(worker(i)) for i in range(5)]
    # 可以在這裡做別的事
    await asyncio.gather(*tasks)    # 最後再一起等

asyncio.run(main())

asyncio.gather(*coros):並行多個協程#

gather 把多個協程 自動包成 Task 加入待辦清單 ,回傳一個 awaitable,要配合 await 才會等它們全部完成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
async def fetch(i):
    await asyncio.sleep(1)
    return i * 2

async def main():
    results = await asyncio.gather(
        fetch(1), fetch(2), fetch(3), fetch(4), fetch(5)
    )
    print(results)        # [2, 4, 6, 8, 10]

asyncio.run(main())

五個 fetch() 各等 1 秒,但因為被一起包成 Task 排進待辦,事件迴圈會在它們之間穿插執行——實際只花約 1 秒。


await#

對應時序圖中協程主動「讓出控制權」的時機(步驟 ③、⑤、⑦)。await <awaitable> 的作用是 把控制權交回事件迴圈 ,讓事件迴圈趁空檔去跑別的協程。

awaitable 物件#

await 後面需要接 awaitable 物件,總共有三種:

型別怎麼來的定位
Coroutine(協程物件)呼叫 async def 函式代表「函式的執行邏輯」本身,必須被啟動才會跑;await不會 讓出控制權
Taskasyncio.create_task(coro) ,或 asyncio.gather(*coros) 自動產生綁定到事件迴圈的協程;await 讓出控制權
Futureasyncio.Future() 或底層 API代表「某個運算的狀態與結果」的容器;官方拿三色號誌燈(紅/黃/綠)比喻 Future 的三種狀態(pending / cancelled / done)。Task 其實是 Future 的子類——日常不直接用,列出備查

await 協程 vs await Task#

有個容易踩到的陷阱:await 一個協程物件,其實不會讓出控制權給事件迴圈 ,效果幾乎等於呼叫一般同步函式。

真正會讓出控制權的是 await 一個 Task 。舉例對照:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

async def coro_a():
    print("A")

async def coro_b():
    print("B(希望沒人霸佔事件迴圈……)")

async def main():
    task_b = asyncio.create_task(coro_b())  # 包成 Task,加入事件迴圈待辦
    for _ in range(3):
        await coro_a()                      # await 協程——不讓出控制權!
    await task_b                            # await Task——讓出控制權

asyncio.run(main())

輸出:

A
A
A
B(希望沒人霸佔事件迴圈……)

coro_a 一口氣跑完三次才輪到 coro_b——因為 await coro_a() 不會把控制權交回事件迴圈,task_b 一直沒機會被挑到。

await coro_a() 改成 await asyncio.create_task(coro_a())(包成 Task 再 await),輸出就變成:

B(希望沒人霸佔事件迴圈……)
A
A
A

這解釋了為什麼並行要靠 asyncio.gatherasyncio.create_task——它們把協程 綁成 Task(綁定到事件迴圈的協程)await 這些 Task 時才會真的讓出控制權,事件迴圈才能穿插執行別的任務。

asyncio.sleep(delay)#

asyncio.sleep(delay)asyncio 提供的非同步版 sleep,作用是 暫停目前的協程 delay ,常用來模擬等待網路、磁碟等 IO;和 time.sleep 不同的是,它在等待期間會把控制權交回事件迴圈,讓別的任務有機會執行。

看這裡可能會疑惑:asyncio.sleep 是 Coroutine(呼叫後回傳協程物件,不是 Task),按說明 await 它應該不會讓出控制權才對?

但根據官方說明:

sleep() always suspends the current task, allowing other tasks to run.

所以 await asyncio.sleep() 仍然會讓出控制權——這是透過 asyncio.sleep() 協程內部的實作達到。


async for / async with#

除了 async defawait,Python 還有兩個實戰常遇到的非同步變體——在進入下一節 aiohttp 實戰前先認識它們:

語法對應同步版何時用
async withwith進入/離開 context 本身會做 IO(建立 HTTP session、連線資料庫)
async forfor迭代時每次取下一筆會做 IO(資料庫 cursor 逐筆讀取)
1
2
3
4
5
6
7
# 非同步 context manager:進入與離開都可能等 IO
async with aiohttp.ClientSession() as session:
    ...

# 非同步迭代器:每次拿下一筆都可能等 IO
async for row in cursor:
    print(row)

規則和 await 相同:只能在 async def 裡用,底層會在等待時讓出事件迴圈給別的任務。下一節 aiohttp 就會大量用到 async with


實戰:並行抓多個網頁#

同步版用 requests,非同步版要用 aiohttp(因為 requests 本身不支援 async):

1
pip install aiohttp
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import aiohttp
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://example.com",
        "https://www.python.org",
        "https://httpbin.org/get",
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, html in zip(urls, results):
            print(f"{url}{len(html)} 字元")

t0 = time.time()
asyncio.run(main())
print(f"總耗時:{time.time() - t0:.2f} 秒")

用同樣的邏輯跑 100 個 URL,同步版可能要數十秒,非同步版只要 1–2 秒。


超時與取消#

加上 asyncio.wait_for 設定超時:

1
2
3
4
5
6
7
8
9
async def slow_task():
    await asyncio.sleep(10)        # 假設這是一個會跑很久的 IO
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=3.0)
    except TimeoutError:
        print("超過 3 秒,放棄")

gather 可以指定 return_exceptions=True,讓個別任務失敗不要拖垮全部:

1
2
3
4
5
6
7
8
9
results = await asyncio.gather(
    fetch(1), fetch(2), fetch(3),
    return_exceptions=True     # 失敗的會回傳例外物件,不 raise
)
for r in results:
    if isinstance(r, Exception):
        print(f"任務失敗:{r}")
    else:
        print(f"結果:{r}")

什麼時候該用 async#

場景適合 async?
爬蟲同時抓上百個網頁✅ 非常適合
Web 後端(FastAPI、aiohttp server)✅ 適合,高併發連線
即時聊天、WebSocket✅ 適合
大量資料庫查詢(asyncpg、motor)✅ 適合
影像處理、矩陣運算、壓縮解壓❌ 用 multiprocessing
一次性小腳本、沒有大量等待❌ 用同步就好,別自找麻煩
想讓既有同步函式庫(如 requests)變快❌ 改不了,要換成 aiohttphttpx

和 threading、multiprocessing 的分工#

方式適用情境特色
asyncioIO 密集、大量等待單執行緒、需配合 async 函式庫
threadingIO 密集但想沿用同步函式庫受 GIL 限制、有切換成本
multiprocessingCPU 密集運算繞過 GIL、記憶體不共享、啟動慢

簡單口訣:「等」就用 async、「算」就用 multiprocessing


常見錯誤#

忘記 await 協程物件#

1
2
3
async def main():
    asyncio.sleep(3)        # 錯!協程物件被丟棄,裡面什麼都沒跑
    await asyncio.sleep(3)  # 對

Python 會給 coroutine 'sleep' was never awaited 警告——「建立了協程但沒交給事件迴圈跑」。

把同步函式的回傳值拿去 await#

1
await time.sleep(3)   # TypeError: object NoneType can't be used in 'await' expression

和上面剛好相反:time.sleep() 是普通函式、回傳 None,而 None 不屬於任何 awaitable 型別。只有協程物件、Task、Future 能被 await

在 async 函式裡用同步阻塞函式#

1
2
3
async def bad():
    time.sleep(3)              # 不會報錯,但整個事件迴圈被卡死
    requests.get(url)          # 同上

不會拋錯,但 所有其他協程都被卡住 ,等同失去非同步的意義。要改用 asyncio.sleepaiohttphttpx.AsyncClient 等非同步替代品。

async def 外用 await#

1
await asyncio.sleep(1)         # SyntaxError:只能在 async def 裡用

在 Jupyter 裡用 asyncio.run#

Jupyter 本身已經在跑事件迴圈,直接 asyncio.run() 會報錯。解法:直接 await 就好。

1
2
# Jupyter cell
await main()            # 不用 asyncio.run

速查表#

語法類型作用
async def語法關鍵字宣告協程函式,呼叫時不會立刻執行,回傳協程物件
await expr語法關鍵字expr(awaitable)完成;只能在 async def 裡用
asyncio.run(coro)模組函式程式入口;建立事件迴圈、執行 coro、結束後關閉
asyncio.gather(*coros)模組函式多個協程包成 Task 加入待辦並行等待,回傳 list
asyncio.create_task(coro)模組函式單一協程包成 Task 加入待辦
asyncio.sleep(s)模組函式回傳會等 s 秒的協程物件;要搭配 await 才會執行

小結#

回到開頭那個最小範例——同步版 2 個任務要 5 秒 ,非同步版壓到 3 秒 (任務數越多差距越大,見〈實戰:並行抓多個網頁〉)。不是 CPU 變快,而是「不再為每次 IO 乾等」——等待的時間被重疊了。

三句話記住這一章:

  1. async def 宣告協程、await 等一個「可等待對象」、asyncio.run 啟動事件迴圈
  2. await 是動詞,awaitable 是被等的對象 ——兩者缺一不可
  3. 「等」就用 async、「算」就用 multiprocessing