doc.dev1x.org

NATS.io

1. NATS.ioとは

2. NATSとNATS JetStream

3. KEY-VALUEストア

4. オブジェクトストレージ

5. docker composeで動かしてみる

version: "3.5"
services:
  nats:
    image: nats
    ports:
      - "8222:8222"
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222 "
  nats-1:
    image: nats
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
    depends_on: [ "nats" ]
  nats-2:
    image: nats
    command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222"
    depends_on: [ "nats" ]
  nats-cli:
    image: natsio/nats-box
$ docker compose up
$ docker compose run nats-cli nats sub -s nats://nats:4222 hello
$ docker compose run nats-cli nats pub -s nats://nats:4222 hello WORLD
[#1] Received on "hello"
WORLD

6. jetstreamモードで起動する

version: "3.5"
services:
  nats:
    image: nats
    ports:
      - 4222:4222
      - 8222:8222
    command: [
      "--port",
      "4222",
      "--http_port",
      "8222",
      "--jetstream",
      "--store_dir",
      "/data",
      "--debug",
  ]
  nats-cli:
    image: natsio/nats-box

7. NATSのメッセージモデル

8. NATSのメッセージ配信モデル

9. サブスクライバの実装サンプル

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    // NATSサーバーに接続
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // メッセージを受信する処理
    _, err = nc.Subscribe("my-subject", func(msg *nats.Msg) {
        fmt.Printf("Received a message: %s\n", string(msg.Data))
        // メッセージ処理に時間がかかる場合は処理時間を調整する
        time.Sleep(time.Second * 2)
    })
    if err != nil {
        log.Fatal(err)
    }

    // シグナルを受け取るチャネル
    c := make(chan os.Signal, 1)
    // 監視するシグナルを登録 (SIGINT, SIGTERMなど)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    // シグナルを受信するまで待つ
    <-c

    // Graceful Shutdown開始
    fmt.Println("Shutting down...")

    // 新しいメッセージの受信を停止
    nc.Drain()

    // 未処理のメッセージが処理されるまで待つ
    time.Sleep(time.Second * 5)

    // NATS接続を閉じる
    nc.Close()
    fmt.Println("Exited gracefully")
}

参考資料