這個站成立這麼久,終於出現一篇談設計模式的文了,代表我的人生進階了嗎?(沒有。)

最近遇到一個需求,當某類物件狀態發生變化時,要有三四個模組被通知到,並且各自做出反應。

講得更具體一點,一個多管道社群貼文發布系統,它的核心功能是,一則訊息在系統內發出後,要發到 Twitter、Facebook、Instagram 等外部社群,當不採取觀察者模式時,程式碼大概會長這樣:

# twitter.py
def publish_to_twitter(post_content: str): ...


# facebook.py
def publish_to_facebook(post_content: str): ...


# instagram.py
def publish_to_instagram(post_content: str): ...


# main.py
@router.post('/posts/publish')
def publish(post_content: str):
    publish_to_twitter(post_content)
    publish_to_facebook(post_content)
    publish_to_instagram(post_content)

然而隨著社群型態的豐富,或者隨著業務需求擴展,要覆蓋的社群越來越多,例如 Mastodon、Threads、Plurk、LinkedIn、MeWe、Gab 等等,為了滿這些需求,上面的 publish_to_xxx() 也得越來越長,另外,除了發貼文外,也還有其他行為,像是偷改貼文、偷刪貼文等等,這些都是社群小編普遍需要的功能,也因此,除了好長的 publish_to_xxx() 外,還有好長的 update_to_xxx()delete_from_xxx(),每個動作下面接的函式都又冗又長,這種時候,就可考慮採用 pyee 與觀察者模式讓應用內的模組解耦,使模組間的邊界更清晰。

pyee

pyee 的「ee」表示 event emitter,名字與特性都取自 Node.js 的 event emitter 模組,顧名思義,它的主要特性就是事件發送與處理,下面我們邊用邊認識 pyee。

延續上面的案例,採用 pyee 後,上面的程式碼變這樣:

# event.py
from pyee import EventEmitter
ee = EventEmitter()

@ee.on('error')
def on_error(*args, **kwargs):
    logger.error({'event': 'error', 'args': args, 'kwargs': kwargs})


# mastodon.py
from event import ee
@ee.on('publishing')
def publish_to_mastodon(post_content: str, *args, **kwargs): ...


# threads.py
from event import ee
@ee.on('publishing')
def publish_to_threads(post_content: str, *args, **kwargs): ...


# xxx.py
from event import ee
@ee.on('publishing')
def publish_to_xxx(post_content: str, *args, **kwargs): ...


# main.py
from event import ee
@router.post('/posts/publish')
def publish(post_content: str):
    ee.emit('publishing', post_content=post_content)
  • 我們先在 event.py 建立一個 EventEmitter 實例 ee,它是事件發布與事件處理函式的主角。
  • 然後我們在各個社群模組中透過裝飾器 @ee.on('publishing') 註冊當 publishing 事件發生時,各模組的處理函式。
  • 在我們自己的 API 端點,收到客戶端的發布請求後,透過 ee 發射 publishing 事件,如此先前註冊的那些處理函式就會逐個執行。
  • 另外 event.py 還有個 error 事件觀察者,這個 error 事件是事件處理器發生例外時,pyee 自己會發出的事件

在 pyee 的設計裡,一個事件可以有多個處理器,它們是依序被呼叫的,當一個事件的處理器發生沒捕捉的例外,也沒有在 error 處理器這邊處理的話,那這個事件的後續其它處理器也就此中斷了。

上面的範例都是傳統的同步函式,而 pyee 也有異步的 pyee.asyncio.AsyncIOEventEmitter 可以使用,如果事件處理器是帶有 async 前綴的異步函式,那就得用 AsyncIOEventEmitter 才能正確工作囉,而即便是異步,事件處理函式依然是依序被呼叫的,所以前面的規則依然適用。

pyee 本身還有提供一些管理 API 為我們所用,參照下面程式碼:

from pyee.asyncio import AsyncIOEventEmitter
ee = EventEmitter()

# 列出所有觀察者觀察的事件
ee.event_names() # ['publishing', 'updating', 'deletion', ...]

# 列出一個事件的觀察者
ee.listeners('publishing') # [mastodon.publish_to_mastodon, ...]

在 pyee 的加持下,就算未來有更多新興社群,主程式也都不需要再改,它一律發出同樣的 publishing 事件,各個社群只要自行向 ee 註冊自己的事件處理器就好,如此主程式與那些社群串接模組就解除了依賴與耦合,這樣他們各自的邊界是不是更清晰了呢。

模組邊界清晰對多人開發的好處是顯而易見的,負責處理主要邏輯的同事和負責串接 Mastodon 的同事再也不會互相改到彼此的模組,滿足了我輩資訊人莫名對解耦的追求。

觀察者模式

回頭談觀察者模式,在上面的例子中,那些被 @ee.on() 裝飾器註冊的函式我們稱為觀察者,他們會觀察 event emitter 發出的事件,如果是他們所關心的事件,他們就會接手處理,所以我們也可以叫他們事件處理器,從底層的觀點看,其實這都是 EventEmitter 在背後幫我們呼叫那些處理器工作,所謂觀察者是從高層次的觀點給的命名,其實函式還是函式,哪會觀察什麼東西。

前面我們以 pyee 與 publishing 事件舉例觀察者模式的應用,同樣的模式也可以套用在其他事件上,像是 updating、deletion,想得更廣一點,其他應用也有它可以發揮的地方,例如:

  • WMS 系統的發料單的行為與狀態變化事件,像是 created、approved、picked、issued 等等。
  • MES 系統的工單行為與狀態變化事件,像是 created、approved、scheduled、in-progress、finishing 等等。
  • 充電系統的行為與狀態變化事件,像是 available、preparing、charging、suspended、finishing、reserved、unavailable、faulted 等等。

這些狀態變化可能來自用戶請求、程式邏輯,或來自狀態機,總之狀態變更只要牽涉到其他模組需要做出對應的動作,這種時候觀察者模式就是我們的好朋友。

其它有點像又不一樣的設計模式

觀察者模式適用的場景在應用內,隨場景不同,還可以考慮其他類似的設計模式:

  • 發布-訂閱模式(pub / sub pattern),相較於觀察者模式,發布-訂閱模式會依賴一個獨立的訊息代理器(message broker),例如 MQTT 就是此模式下的典型協議,多個服務間不直接通信,而是各自對訊息代理器發布或訂閱關注的主題(topic),如此也解耦了服務間的依賴,但相對的每個服務都依賴於訊息代理器,也就是說訊息代理器本身最好是高可用的,這意味系統複雜度與成本的提升。
  • 調度器模式(dispatcher pattern),在觀察者模式中,event emitter 只負責針對事件呼叫處理器,職責相對單純,而調度器模式中的調度器的職責較重,它主動針對事件做處理與呼叫,它所呼叫的事件處理器並不像觀察者模式中需要先註冊自己關注的事件,哪個事件該怎麼處理、該派給誰處理的職責都交給調度器主理。
  • Hook 模式,此模式下應用內會預先定義好幾個 hook 點,其他模組可以向系統註冊自己的 hook callback 函式,當應用跑到 hook 點時,會去跑已註冊的 hook callback 函式,然後再回到主程式繼續後面的計算,例如可以事先定義一個名為 save_post 的 hook 點,當儲存貼文時,其他的 save_post hook callback 函式會被呼叫,跑一些自己的邏輯,然後再回到主程式,hook 模式大多用於外掛、擴展機制,例如 WordPress、pytest、pluggy 都是以 hook 為基礎給第三方發展外掛。