NATS.io
1. NATS.ioとは
- NATSはOSSのメッセージキュー
- NATSサーバ自体はGoで実装されている
- クライアントライブラリは各種言語向けが用意されている
2. NATSとNATS JetStream
- NATS単体はPub/Subシステム
- NATS JetStreameはNATSにストリーミング機能と永続化機能を追加したもの
- つまり、NATS単体にはメッセージの永続化機能は無い
- NATS単体では
at most once
- SubscriberがNATSサーバに接続していない状態ではメッセージを受信できない&メッセージは消える
- NATS JetStreameは
exactly once
- 必ずメッセージを実行できる(Subscriberが受信できなかった時はリトライする)
- メッセージの重複が起きない構造になっている(らしい)
3. KEY-VALUEストア
- NATS JetStreameの永続化機能を利用したKEY-VALUEストアを提供可能らしい
- NATS JetStreameは分散システムとして振る舞うのでmemcacheやredisの代用にもなる模様
- [MEMO] そこまで興味はない...
- cf. Key/Value Store | NATS Docs
4. オブジェクトストレージ
5. docker composeで動かしてみる
version: "3.5"
services:
nats:
image: nats
ports:
- "8222:8222"
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222 "
nats-1:
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
depends_on: [ "nats" ]
nats-2:
image: nats
command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
depends_on: [ "nats" ]
nats-cli:
image: natsio/nats-box
$ docker compose up
- サブスクライバ
- ターミナルを別ウィンドウで開くこと
$ docker compose exec nats-cli nats sub -s nats://nats:4222 hello
$ docker compose exec nats-cli nats pub -s nats://nats:4222 hello WORLD
[#1] Received on "hello"
WORLD
6. jetstreamモードで起動する
version: "3.5"
services:
nats:
image: nats
ports:
- 4222:4222
- 8222:8222
command: [
"--port",
"4222",
"--http_port",
"8222",
"--jetstream",
"--store_dir",
"/data",
"--debug",
]
nats-cli:
image: natsio/nats-box
-
--store_dir
で指定されたディレクトリにメッセージが保存される
- コンテナ再起動でメッセージがロストしないようにホストマシンとボリュームを共有した方がよさそう
7. NATSのメッセージモデル
- Publisher
- Subscriber
- Subject
- メッセージの名前空間(のようなもの)
- 階層化できる
8. NATSのメッセージ配信モデル
- (1) Pub/Sub
- (2) Request/Reply
- (3) Queue Group
9. サブスクライバの実装サンプル
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// NATSサーバーに接続
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// メッセージを受信する処理
_, err = nc.Subscribe("my-subject", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
// メッセージ処理に時間がかかる場合は、処理時間を調整する
time.Sleep(time.Second * 2)
})
if err != nil {
log.Fatal(err)
}
// シグナルを受け取るチャネル
c := make(chan os.Signal, 1)
// 監視するシグナルを登録 (SIGINT, SIGTERMなど)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
// シグナルを受信するまで待つ
<-c
// Graceful Shutdown開始
fmt.Println("Shutting down...")
// 新しいメッセージの受信を停止
nc.Drain()
// 未処理のメッセージが処理されるまで待つ
time.Sleep(time.Second * 5)
// NATS接続を閉じる
nc.Close()
fmt.Println("Exited gracefully")
}
10. 設定ファイル
nats-server -config <設定ファイル>
host: 0.0.0.0
port: 4222
jetstream: enabled
debug: true
jetstream {
store_dir: /var/nats
max_mem: 1G
max_file_store: 10G
}
11. DrainとStop
- Consumerの動作を停止させる方法はDrainとStopの二種類ある
- Drainはコンシューマにメッセージの受信を停止させた後、バッファに積まれているメッセージを処理した後、Consumerを停止させる
- Stopは即座にConsumerを停止させ、バッファに積まれているメッセージも破棄される
- 破棄されたメッセージはNATSサーバに戻されることはなく、単にデータロストになる
- 以上の性質から、Consumerを停止させたい時は通常Drainを使用し、システムエラーなどシステムの実行が不可能な場合のみStopを使うべきである
12. プル型コンシューマ
- 先述のサンプルコードでは
Consume
を使ってメッセージを受信した
cc, err := consumer.Consume(func(msg jetstream.Msg) {
fmt.Println("Message: ", string(msg.Data()))
msg.Ack()
})
-
Consume
を使うとメッセージをパブリッシャから送られてくるのを待つプッシュ型コンシューマになる(GCPのPub/Sub、AWSのSNSのイメージ)
- 一方でNATSサーバにメッセージを蓄積して定期的にコンシューマが取得しにいくようにしたい場合もある(AWSのSQSのイメージ)
- ↑をプル型のコンシューマと呼ぶ
- プル型のコンシューマにしたい場合は
Fetch
やFetchNoWait
を使えばよい
- 以下サンプルコード(インポート部分は省略)
func main() {
nc, err := nats.Connect("nats://nats:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
stream, err := js.Stream(ctx, "TEST")
if err != nil {
log.Fatal(err)
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "TEST",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
log.Fatal(err)
}
for {
mb, err := consumer.FetchNoWait(5)
if err != nil {
fmt.Println("Error:", err)
}
for msg := range mb.Messages() {
fmt.Printf("Data: %s\n", msg.Data())
ackErr := msg.Ack()
if ackErr != nil {
fmt.Println("AckError:", ackErr)
}
}
time.Sleep(10 * time.Second)
}
exitSignal := make(chan os.Signal, 1)
signal.Notify(exitSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-exitSignal
nc.Close()
}
- 10秒ごとにNATSサーバにメッセージを取得するようになっている
- Cronやスケジューラに起動を任せるならループ部分は不要になる
-
Fetch
とFetchNoWait
の違いはメッセージが無かった場合待機するかどうか
-
Fetch
はメッセージが無かった場合、3秒待機(変更可能)する(SQSで言うロングポーリングみたいなイメージ)
-
FetchNoWait
は名前の通り、待機せず結果を返す
- サンプルコードのように明示的にスリープ時間を入れるなら
FetchNoWait
を使うとよいだろう
- プッシュ型とプル型の使い分け
- リアルタイム性が要求されるならプッシュ型
- DBへの負荷を抑えたい(書き込み量をコントロールしたい)、一度に処理するデータ量をコントロールしたいのであればプル型
- 似たような要件で、外部APIのレートリミットを超えないようにリクエスト数を制御したいといった単位時間辺りの処理件数を制御したいならプル型
参考資料
Appendix-1 動作確認プロジェクト
Dockerfile
FROM golang:1.23
WORKDIR /app
ENV CGO_ENABLED=1 \
GOOS=linux \
GOARCH=amd64
ADD . /app
CMD ["go", "run", "/app/main.go"]
docker-compose.yaml
services:
nats:
image: nats
ports:
- "4222:4222"
volumes:
- type: bind
source: "./nats.conf"
target: "/nats.conf"
command: ["-config", "/nats.conf"]
nats-cli:
image: natsio/nats-box
go:
build:
context: .
dockerfile: Dockerfile
command: [ "go", "run", "/app/main.go" ]
volumes:
- type: bind
source: "."
target: "/app"
main.go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
nc, err := nats.Connect("nats://nats:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := jetstream.New(nc)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
if err != nil {
log.Fatal(err)
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "TEST",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
log.Fatal(err)
}
cc, err := consumer.Consume(func(msg jetstream.Msg) {
fmt.Println("Message: ", string(msg.Data()))
msg.Ack()
})
if err != nil {
log.Fatal(err)
}
defer cc.Drain()
exitSignal := make(chan os.Signal, 1)
signal.Notify(exitSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-exitSignal
cc.Drain()
nc.Close()
}
go.mod
module app
go 1.23
require github.com/nats-io/nats.go v1.37.0
require (
github.com/klauspost/compress v1.17.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
nats.conf
host: 0.0.0.0
port: 4222
jetstream: enabled
debug: true
jetstream {
store_dir: /var/nats
max_mem: 1G
max_file_store: 10G
}