NATS.io
1. NATS.ioとは
- NATSはOSSのメッセージキュー
- NATSサーバ自体はGoで実装されている
- クライアントライブラリは各種言語向けが用意されている
2. NATSとNATS JetStream
- NATS単体はPub/Subシステム
- NATS JetStreameはNATSにストリーミング機能と永続化機能を追加したもの
- つまり、NATS単体にはメッセージの永続化機能は無い
- NATS単体では
at most once
- SubscriberがNATSサーバに接続していない状態ではメッセージを受信できない&メッセージは消える
- NATS JetStreameは
exactly once
- 必ずメッセージを実行できる(Subscriberが受信できなかった時はリトライする)
- メッセージの重複が起きない構造になっている(らしい)
3. KEY-VALUEストア
- NATS JetStreameの永続化機能を利用したKEY-VALUEストアを提供可能らしい
- NATS JetStreameは分散システムとして振る舞うのでmemcacheやredisの代用にもなる模様
- [MEMO] そこまで興味はない...
- cf. Key/Value Store | NATS Docs
4. オブジェクトストレージ
5. docker composeで動かしてみる
version: "3.5"
services:
nats:
image: nats
ports:
- "8222:8222"
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222 "
nats-1:
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
depends_on: [ "nats" ]
nats-2:
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
depends_on: [ "nats" ]
nats-cli:
image: natsio/nats-box
$ docker compose up
- サブスクライバ
- ターミナルを別ウィンドウで開くこと
$ docker compose run nats-cli nats sub -s nats://nats:4222 hello
$ docker compose run nats-cli nats pub -s nats://nats:4222 hello WORLD
[#1] Received on "hello"
WORLD
6. jetstreamモードで起動する
version: "3.5"
services:
nats:
image: nats
ports:
- 4222:4222
- 8222:8222
command: [
"--port",
"4222",
"--http_port",
"8222",
"--jetstream",
"--store_dir",
"/data",
"--debug",
]
nats-cli:
image: natsio/nats-box
-
--store_dir
で指定されたディレクトリにメッセージが保存される
- コンテナ再起動でメッセージがロストしないようにホストマシンとボリュームを共有した方がよさそう
7. NATSのメッセージモデル
- Publisher
- Subscriber
- Subject
- メッセージの名前空間(のようなもの)
- 階層化できる
8. NATSのメッセージ配信モデル
- (1) Pub/Sub
- (2) Request/Reply
- (3) Queue Group
9. サブスクライバの実装サンプル
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// NATSサーバーに接続
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// メッセージを受信する処理
_, err = nc.Subscribe("my-subject", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
// メッセージ処理に時間がかかる場合は、処理時間を調整する
time.Sleep(time.Second * 2)
})
if err != nil {
log.Fatal(err)
}
// シグナルを受け取るチャネル
c := make(chan os.Signal, 1)
// 監視するシグナルを登録 (SIGINT, SIGTERMなど)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
// シグナルを受信するまで待つ
<-c
// Graceful Shutdown開始
fmt.Println("Shutting down...")
// 新しいメッセージの受信を停止
nc.Drain()
// 未処理のメッセージが処理されるまで待つ
time.Sleep(time.Second * 5)
// NATS接続を閉じる
nc.Close()
fmt.Println("Exited gracefully")
}
参考資料