最近因為任務關係對 NATS Streaming 做了一些研究跟 POC,這邊把一些研究到的NATS Streaming 特性記錄下來
什麼是 NATS
NATS 是一種 Queue , 它效能比一般的 Queue 好上許多,號稱 Sender / Reciever 每秒可高達 20 萬筆 Message,這是非常驚人的數字。
(來源:https://bravenewgeek.com/dissecting-message-queues/)
可以看到 RabbitMQ 在它旁邊矮了一大截….
NATS 支援 Subscribe 模式,Subscriber 訂閱需要的 subject,當 NATS 收到 Message 時就會同時派發給所有訂閱者。
(來源:https://nats-io.github.io/docs/developer/concepts/pubsub.html)
NATS 也支援 Queu Group,NATS 會對同一個 Queue Group 的所有 Subscriber 分配訂閱的 Message,而不會重複
(來源:https://nats-io.github.io/docs/developer/concepts/queue.html)
不過這些都不算太特別,畢竟 RabbitMQ 一樣能做到相同的功能,最特別的是 NATS Streaming
什麼是 NATS Streaming
NATS Streaming 是附加在 NATS 之上的加值功能,他可以永久的保留所有的 Message,並且支援 Cursor(指標)功能,你可以任意的讓 Subscriber 回到特定的點 重播 所有的 Message。
這一點相當有用,試想一下,如果今天我們將所有交易的過程都透過 NATS Streaming 記錄下來,假設資料庫或記錄的地方損毀,我們只要重新 Replay 所有 Message,即能重建最後的結果。這也是 Event Sourcing 中最重要的的一環。
為了達成 Event Sourcing 這個設計模式的目標,演練了一些特定的情境
- 為了達成 HA 及加快處理速度的需求,需要多個 Subscriber 同時訂閱相同 Subject,並且彼此不會做到重複的 Message。
- 演練災難還原,Subscriber 必須能回朔到過去的的定時間點,重新 Replay 所有歷程
1. Queue Group
其實第一項相對於容易,只要用 Queue Group 即可輕鬆達成,只要每個 Subscriber 給定同一組 Queue Group Name 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| var options = StanOptions.GetDefaultOptions();
var subOptions = StanSubscriptionOptions.GetDefaultOptions(); subOptions.MaxInflight = 1;
var cf = new StanConnectionFactory();
var c = cf.CreateConnection("test-cluster", clientId, options); var s = c.Subscribe("foo", queueGroup, subOptions, (obj, args) => { ...... });
s.Close(); c.Close(); c.Dispose();
|
這邊需要特別注意 MaxInflight 這個參數,為了效率 NATS 預設會一次派給 Subscriber 1000 則Message(預設值有點忘記),好讓 Subscriber 不需要每做完一筆才透過網路要下一筆 Message,但這也讓我測出一個問題,當預收的訊息太多並且 Subscriber 根本來不及消化掉,這時候 NATS 會判定該則訊息 timeout,轉而派給同群別的Subscriber 來處理,這時候可能會發生兩個 Subscriber 執行到同一筆 Message 的狀況,所以請估算好 MaxInflight 的值,避免抓一堆做不完重派的狀況發生。
當最後一個 Subscriber 離線後,Queue Group 即會被刪除
2. Durable Name
前面有提到 NATS Streaming 支援 Cursor 的功能,當 Subscriber 都離線時,如果有給它 Durable Name,他會記得上次你執行到哪一筆,當下次 Subscriber 重新連上時會從那個點繼續往下派發
1 2
| var subOptions = StanSubscriptionOptions.GetDefaultOptions(); subOptions.DurableName = _appSetting.DURABLENAME;
|
3. StartAt
接著來實作災難還原的步驟,首先如何讓特定的 Queue Group 退到特定的點,呈上,同時多個 Subscriber 與 Durable Name 都還是要能符合。
SDK 其實有提供 StartAt() 的 API
1 2 3
| var subOptions = StanSubscriptionOptions.GetDefaultOptions(); subOptions.StartAt(22);
|
不過這邊需特別注意,同一個 Queue Group 只有在第一個 Subscriber 進來時(也就建立這個 Queue Group 的時候)StartAt 才會生效,之後進來的 Subscriber 帶這個值都會被直接忽略掉。
這邊問題就來了,當我們帶了 Queue Group + Durable Name,即便所有的 Subscriber 都離線了,NATS 還是會貼心的幫你記錄最後執行到的地方,所以之後進來的 Subscriber 也都不可能是第一個了,換言之這樣的狀況是無法重新設定指標的。
所以如果要刪除 Queue Group + Durable Name 的 Queue Group 唯一的方法只有讓所有的 Subscriber 退訂閱
這邊跟上面的範例程式不一樣,上面的是用 **Close()**,這樣並不會讓 Subscriber 退訂閱,只會是離線而已,唯有用 Unsubscribe() 將 Subscriber 全部退訂閱後,有 Durable Name 的 Queue Group 才會被刪除掉。
另外 NATS SDK 並沒有提供查詢 Queue Group 所有 Subscriber 的 Client ID,依據官方的說法,這份清單應該是使用者需要自己維護的,在上面這個案例上,就會需要將所有 Subscriber 用相同 Client ID 連上線後退訂閱。
4. ManualAcks
用上述的方法成功刪除 Queue Group 後,下一步就是重新讓第一個 Subscriber 去建立 Queue Group 並指定 StartAt ,不過這邊有點麻煩的地方是,Subscriber 一連線後馬上就會開始拋 Message 過來,這時後如果你是希望災難還原與重新執行 Replay 分開做時就會很困擾。
預設 StanSubscriptionOptions 是收到 Message 後自動就幫你 Ack 回報 NATS 你收到了在處理了,NATS 也就會繼續往下把下一筆分配給別人,但我只是想指定位子不想要處理 Message 呢?
這時候就要透過設定將 Ack 改成手動回報
1 2 3 4 5 6 7 8 9
| var subOptions = StanSubscriptionOptions.GetDefaultOptions(); subOptions.ManualAcks = true;
.......
var s = c.Subscribe("foo", queueGroup, subOptions, (obj, args) => { args.Message.Ack(); });
|
這樣只要不呼叫 **Ack()**,就等於不處理這筆 Message 了,當 Subscriber 離線或是 Message timeout,NATS 就會重新分派給其他 Subscriber 處理了,這樣也間接達到重建 Queue Group 與指定 Cursor 的目標。