doc.dev1x.org

NATS.io

1. NATS.ioとは

2. NATSとNATS JetStream

3. KEY-VALUEストア

4. オブジェクトストレージ

5. docker composeで動かしてみる

version: "3.5"
services:
  nats:
    image: nats
    ports:
      - "8222:8222"
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222 "
  nats-1:
    image: nats
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
    depends_on: [ "nats" ]
  nats-2:
    image: nats
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
    depends_on: [ "nats" ]
  nats-cli:
    image: natsio/nats-box
$ docker compose up
$ docker compose exec nats-cli nats sub -s nats://nats:4222 hello
$ docker compose exec nats-cli nats pub -s nats://nats:4222 hello WORLD
[#1] Received on "hello"
WORLD

6. jetstreamモードで起動する

version: "3.5"
services:
  nats:
    image: nats
    ports:
      - 4222:4222
      - 8222:8222
    command: [
      "--port",
      "4222",
      "--http_port",
      "8222",
      "--jetstream",
      "--store_dir",
      "/data",
      "--debug",
  ]
  nats-cli:
    image: natsio/nats-box

7. NATSのメッセージモデル

8. NATSのメッセージ配信モデル

9. サブスクライバの実装サンプル

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    // NATSサーバーに接続
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // メッセージを受信する処理
    _, err = nc.Subscribe("my-subject", func(msg *nats.Msg) {
        fmt.Printf("Received a message: %s\n", string(msg.Data))
        // メッセージ処理に時間がかかる場合は処理時間を調整する
        time.Sleep(time.Second * 2)
    })
    if err != nil {
        log.Fatal(err)
    }

    // シグナルを受け取るチャネル
    c := make(chan os.Signal, 1)
    // 監視するシグナルを登録 (SIGINT, SIGTERMなど)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    // シグナルを受信するまで待つ
    <-c

    // Graceful Shutdown開始
    fmt.Println("Shutting down...")

    // 新しいメッセージの受信を停止
    nc.Drain()

    // 未処理のメッセージが処理されるまで待つ
    time.Sleep(time.Second * 5)

    // NATS接続を閉じる
    nc.Close()
    fmt.Println("Exited gracefully")
}

10. 設定ファイル

nats-server -config <設定ファイル>
host: 0.0.0.0
port: 4222
jetstream: enabled
debug: true
jetstream {
    store_dir: /var/nats
    max_mem: 1G
    max_file_store: 10G
}

11. DrainとStop

12. プル型コンシューマ

cc, err := consumer.Consume(func(msg jetstream.Msg) {
    fmt.Println("Message: ", string(msg.Data()))
    msg.Ack()
})
func main() {

    nc, err := nats.Connect("nats://nats:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := jetstream.New(nc)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    stream, err := js.Stream(ctx, "TEST")
    if err != nil {
        log.Fatal(err)
    }

    consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Durable:   "TEST",
        AckPolicy: jetstream.AckExplicitPolicy,
    })
    if err != nil {
        log.Fatal(err)
    }

    for {
        mb, err := consumer.FetchNoWait(5)
        if err != nil {
            fmt.Println("Error:", err)
        }
        for msg := range mb.Messages() {
            fmt.Printf("Data: %s\n", msg.Data())
            ackErr := msg.Ack()
            if ackErr != nil {
                fmt.Println("AckError:", ackErr)
            }
        }
        time.Sleep(10 * time.Second)
    }

    exitSignal := make(chan os.Signal, 1)
    signal.Notify(exitSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

    <-exitSignal
    nc.Close()
}

参考資料

Appendix-1 動作確認プロジェクト

Dockerfile

FROM golang:1.23

WORKDIR /app

ENV CGO_ENABLED=1 \
    GOOS=linux \
    GOARCH=amd64

ADD . /app

CMD ["go", "run", "/app/main.go"]

docker-compose.yaml

services:
  nats:
    image: nats
    ports:
      - "4222:4222"
    volumes:
      - type: bind
        source: "./nats.conf"
        target: "/nats.conf"
    command: ["-config", "/nats.conf"]
  nats-cli:
    image: natsio/nats-box
  go:
    build:
      context: .
      dockerfile: Dockerfile
    command: [ "go", "run", "/app/main.go" ]
    volumes:
      - type: bind
        source: "."
        target: "/app"

main.go

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

func main() {

    nc, err := nats.Connect("nats://nats:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := jetstream.New(nc)
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
        Name:     "TEST",
        Subjects: []string{"test"},
    })
    if err != nil {
        log.Fatal(err)
    }

    consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Name:      "TEST",
        AckPolicy: jetstream.AckExplicitPolicy,
    })
    if err != nil {
        log.Fatal(err)
    }

    cc, err := consumer.Consume(func(msg jetstream.Msg) {
        fmt.Println("Message: ", string(msg.Data()))
        msg.Ack()
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cc.Drain()

    exitSignal := make(chan os.Signal, 1)
    signal.Notify(exitSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

    <-exitSignal

    cc.Drain()
    nc.Close()
}

go.mod

module app

go 1.23

require github.com/nats-io/nats.go v1.37.0

require (
    github.com/klauspost/compress v1.17.2 // indirect
    github.com/nats-io/nkeys v0.4.7 // indirect
    github.com/nats-io/nuid v1.0.1 // indirect
    golang.org/x/crypto v0.18.0 // indirect
    golang.org/x/sys v0.16.0 // indirect
    golang.org/x/text v0.14.0 // indirect
)

nats.conf

host: 0.0.0.0
port: 4222
jetstream: enabled
debug: true
jetstream {
    store_dir: /var/nats
    max_mem: 1G
    max_file_store: 10G
}