はじめに
基盤本部で今後のhacomonoのアーキテクチャ設計をしている @bootjp と申します。
最近はマイクロサービス化に向けての社内共通のイベントバスの設計や基盤周りの設計/実装を行っています。
以前にはこのような記事を書き、分散システムや分散データベース、分散ストレージなどが大好きです。
「Goで作って理解するRaftベースRedis互換KVS」という同人誌も書いています。
もし興味のある方はお手にとってみてください。
さて、今回は分散システムやマイクロサービスで頻繁に用いられるQueueについて焦点を当てた記事を書きます。
とくにマネージドサービスを用いない環境では、RedisでQueueを実装してる方は多いのではないでしょうか?
しかし、ナイーブなQueueの実装には罠があり、データロストの危険性があります。
実例とともにデータをロストしないためには何が必要なのかをこの記事で述べていきます。
冪等性に関わる事柄(exactly once, at-least once, 重複排除など)はこの記事では扱わず、データのロストのみに焦点を当てます。
用語
この記事では以下の意味でこれらの用語を用います。
- Queue: キューそのもの、あるいはPub/Subなどのトピック
- メッセージ: Queueに格納されるデータ
- Worker: Queueからメッセージを取得し所定の動作を行うアプリケーションの総称
- ナイーブなQueue: 後述しますが、メッセージ取得と同時にQueueからメッセージを削除するもの、あるいはその実装のライブラリを用いたQueue
- ロスト: Queueに追加されたメッセージがWorkerによる処理を経ず、Queueから消えてしまうこと
どういうケースでデータがロストするのか
ナイーブなQueueの実装はデータが失われると述べました。
実際にデータがロストするケースについて図解します。
Workerクラッシュによるデータロストのケース
---
title: Workerクラッシュによるデータロストのケース
---
sequenceDiagram
autonumber
participant P as Producer
participant R as Redis List (queue)
participant W as Worker
%% Note over P,W: 🟥 = ロストポイント
P->>R: LPUSH "msg-42" <br> メッセージをキューに積む
R-->>P: 格納完了
W->>R: BRPOP (ブロッキング)
R-->>W: "msg-42" を返す ※同時にリストから削除
Note right of W: Worker は「処理中」状態
W-->>W: クラッシュ 🟥
Note over R: Redis に "msg-42" は残っていない
Note right of W: 新 Worker 起動
W->>R: BRPOP
R-->>W: キューは空 🟥
Note over P,W: 結果 → "msg-42" は永遠に失われる
このように(4)でQueueから取得し、すぐにQueueからデータが削除されるとその後、Queueの処理中にWorkerがクラッシュした場合に、データがロストします。
この場合、Wokerが処理をしていたメッセージは失われ、処理が行われません。
クラッシュはどんなときに発生するのか
クラッシュと聞くと、アプリケーションで適切にハンドリングし、Queueに書き戻すことで安全に扱えると思われるかもしれません。
しかし、クラッシュが発生しうるケースというのはいくつかあり、必ずしもアプリケーション側で検知し、回復できるものだけではありません。
| レイヤー | 具体例 | 発生タイミング |
|---|---|---|
| アプリケーション | 未捕捉例外/セグメンテーションフォルト | アプリケーション処理中 |
| プロセス | SIGKILL(OOM Killerなど) | カーネルによる強制終了 |
| VM/ノード | OSカーネルパニック/ホスト障害/リブート | 実行中の VM が落ちる |
| オートスケール | コンテナ/VMのスケールインでの終了 | スケールイン |
このうちSIGKILLとVM/ノードの障害がアプリケーションでの対処が難しい領域です。
これらの問題への対処はアプリケーション側レベルではなく、アーキテクチャレベルで対応していく必要があります。
Visibility Timeout / Acknowledgment deadlineという機能
この問題にアーキテクチャレベルで対処するための仕組みとして、Amazon SQSのVisibility TimeoutやGoogle Cloud Pub/Sub の Acknowledgment deadlineという機能があります。
これは処理中のメッセージが規定の時間以内に応答がないと、処理に失敗したとみなし、再度キューに戻される機能です。
💡Tips: Visibility Timeout は他のワーカーからメッセージが取得できないという機能もあります。
この記事では冪等性についてはスコープ外としており、以降でもこの機能については触れません。
この機能はQueueからメッセージの取得後、所定時間を経過した場合にメッセージを再度取得可能にするという機能です。
以下にこれらの機能を用いてWorkerクラッシュ時に再取得可能なQueueの図を示します。
Workerクラッシュ時にも再取得可能なQueue
---
title: Workerクラッシュ時にも再取得可能なQueue
---
sequenceDiagram
autonumber
participant P as Producer
participant Q as SQS Queue
participant W1 as Worker<br/>(元ワーカー)
participant W2 as Worker<br/>(再取得ワーカー)
%% 🟥 ロストポイント, 🟢 救済ポイント
P->>Q: SendMessage "msg-42"
Q-->>P: メッセージ格納完了
W1->>Q: ReceiveMessage (VisibilityTimeout = 30s)
Q-->>W1: "msg-42" を返す<br/>(不可視状態へ)
Note right of Q: "msg-42" は **in-flight**<br/>キュー上に残りつつ不可視
W1-->>W1: クラッシュ 🟥
Note over W1: ACK(DeleteMessage)<br/>は実行されない
Q-->>Q: 30s タイムアウト満了 🟢
Note right of Q: "msg-42" が再可視化<br/>(自動再配信準備完了)
W2->>Q: ReceiveMessage
Q-->>W2: "msg-42" を再送
W2->>Q: DeleteMessage (ACK)
Q-->>W2: 削除完了<br/>ロスト回避成功
先ほどと同じく、(4)でメッセージを取得後、(5) でクラッシュしております。
しかし、(6) で規定時間のタイムアウトが検知され、別のWorkerにより再取得されます。
なにがこの問題を引き起こすのか
さて、このデータロストの原因を考えてみましょう。
先ほどの 「Workerクラッシュによるデータロストのケース」 ではメッセージの取得とともにQueueからデータを削除しています。
つまり、以下の2つが組み合わさることが原因になります。
- メッセージの取得とともにデータを削除すること
- Wokerがクラッシュすること
つまり、アプリケーション側からの処理完了が返ってくるまでQueueから削除しない ということで安全に処理が可能です。
既存データ型を用いた実装例
既存のOSSではRedisをQueueに使う際に大きく2つの手法を用いています。
- 取得されたメッセージを別のキューに退避させ、一定時間でもとに戻す
- メッセージ毎に所定時間までのロックを生成し、ロック期間がきれたら取得可能なメッセージに戻す
では、Redisではこれらをアトミックに実装するにはどうすればよいでしょうか?
OSSなどの事例ではLuaスクリプトを用いて実装しているケースが一般的なようです。
例えば、BullMQでは以下のLuaスクリプトでアトミックにメッセージを処理中のキューに移動させたうえで、メッセージを取得しています。
このような処理にLuaが使われる背景ですが、Multi+Execコマンドを用いたTransactionでは一部のコマンドでエラーが発生しても他のすべてのコマンドは実行されます。
そのため、処理中の値を変数に格納し、ロジックを記述できアトミックな挙動となるLuaが好まれているものと思われます。
Errors happening after
EXECinstead are not handled in a special way: all the other commands will be executed even if some command fails during the transaction.
Redis Streamを用いる
Redis version 5で導入された、Stream にはまさにPub/Subのための機能があります。
新規で採用する際にはこちらを検討するのもよいでしょう。
以下のように、Streamを使用したキューの実装例を見ていきましょう:
$ redis-cli # 以降はredis-cliの対話シェル # ストリームにメッセージを追加 127.0.0.1:6379> XADD mystream * field1 value1 field2 value2 "1748336513543-0" # コンシューマグループを作成 127.0.0.1:6379> XGROUP CREATE mystream mygroup 0 MKSTREAM OK # グループからメッセージを読み取る # 末尾の > は未配信メッセージのみの指定 # https://redis.io/docs/latest/commands/xreadgroup/#:~:text=The%20special%20%3E%20ID%2C%20which%20means%20that%20the%20consumer%20want%20to%20receive%20only%20messages%20that%20were%20never%20delivered%20to%20any%20other%20consumer.%20It%20just%20means%2C%20give%20me%20new%20messages. 127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) "1748336513543-0" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" # 保留中メッセージの取得 127.0.0.1:6379> XPENDING mystream mygroup 1) (integer) 1 2) "1748336513543-0" 3) "1748336513543-0" 4) 1) 1) "consumer1" 2) "1" # メッセージを処理後、ACK して Pending Entry Listから削除 127.0.0.1:6379> XACK mystream mygroup 1748336513543-0 (integer) 1 # エントリが消えていることを確認 127.0.0.1:6379> XPENDING mystream mygroup 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
主要ライブラリのデータロストの有無
| 言語 | ライブラリ | クラッシュ検知手法 | 再配信トリガー | クラッシュ時のメッセージ消失 |
|---|---|---|---|---|
| Node.js | BullMQ | ジョブのロック期間 (lockDuration), ワーカーのハートビート (ロック自動更新) | 定期実行 | なし |
| Node.js | Bee-Queue | stallInterval (ワーカーからの応答監視) | 定期実行 | なし |
| Ruby | Sidekiq OSS | なし | なし | あり |
| Ruby | Sidekiq Pro | ワーカーのハートビート (super_fetch 時) | 定期実行 | なし (super_fetch利用時) |
| Ruby | Resque | なし | なし | - あり - (実装) |
| Go | Asynq | ワーカーのハートビート, タスク期限 (Timeout, Deadline) | 定期実行 | なし |
まとめ
RedisのQueueをナイーブに用いた実装はWorkerのクラッシュ時にデータロストの危険性があります。
特にデータロストが許容できない用途でのQueueの利用の際には、アーキテクチャレベルでWorkerクラッシュ時の再配信の仕組みを検討するようにしましょう。
また、マネージドサービスを用いない場合は、ライブラリの実装やドキュメント上のトレードオフを確認し、要件にマッチするか確認の上検討しましょう。
このようなノード障害やSIGKILLによるクラッシュは今回のようなQueueに限らず、高い信頼性が求められるアプリケーションでは常に想定の上で設計を行いましょう。
特にマイクロサービスなどの分散システムでは様々なサービス間の通信を非同期で行い、単一のトランザクションで制御不可能な事が多いです。
アプリケーションに求められる信頼性とそれを実現するアーキテクチャ、そしてそれらのトレードオフを探りつつ、最適なバランスを求めていきましょう。
仲間を募集しています
株式会社hacomonoでは一緒に働く仲間を募集しています。
私たちは、技術の仕組みとそれが必要になる背景を深く理解し、そのトレードオフを見極めながら、ビジネスニーズに合わせた適切な実装ができるエンジニアを探しています。
hacomonoではまさにマイクロサービス化に向けて、高い可用性を備えたアーキテクチャの設計や高い信頼性を維持できる形で運用できるシステムの設計、そして実装できるエンジニアを探しています。
💁 関連記事