nsq 在 golang 中使用

文章目录 (?) [+]

    nsqlookupd

    https://nsq.io/components/nsqlookupd.html

    nsqlookupd 是管理拓扑信息的守护程序。客户端通过查询 nsqlookupd 以发现指定 topic 的 nsqd 生产者,并且 nsqd 节点广播 topic 和 channel。

    命令参数

    # lookupd 节点的外部地址,默认为主机名
    -broadcast-address=<host>
    
    # 配置文件路径
    -config=<path>
    
    # 用于 http 客户端通信的监听地址,默认为 0.0.0.0:4161
    -http-address=<addr>:<port>
    
    # 生产者自上次 ping 保留在活动列表中的持续时间,默认为 5m0s
    -inactive-producer-timeout=5m0s
    
    # 日志消息前缀,默认为 '[nsqlookupd] '
    -log-prefix='[nsqlookupd] '
    
    # 用于 tcp 客户端通信的监听地址,默认为 0.0.0.0:4160
    -tcp-address=<addr>:<port>
    
    # 生产者离线后保持注册名状态的持续时间
    -tombstone-lifetime=45s
    
    # 开启详细日志
    -verbose
    
    # 输出版本号
    -version

    启动

    nsqlookupd
    
    # 测试
    curl http://127.0.0.1:4161/ping


    nsqd

    https://nsq.io/components/nsqd.html

    nsqd 是接收、排队并将消息转发到客户端的守护程序。

    一个 nsqd 可以创建多个 topic,每个 topic 有多个 channel,每个 channel 都能获取该 topic 的所有消息的副本。topic 和 channel 都相互独立地缓冲数据,从而防止缓慢的消费者造成 channel 的积压(在主题级别也是如此)。一个 channel 可以连接多个消费者客户端,但每条消息会随机发送其中一个消费者。

    命令参数

    # 认证服务器地址,可以为多个
    -auth-http-address=<addr>:<port>
    
    # 向 lookupd 注册当前 nsqd 节点的外部地址,默认为主机名
    -broadcast-address=<host>
    
    # 配置文件路径
    -config=<config-file>
    
    # 消息备份目录
    -data-path=<path>
    
    # 是否启用客户端协商压缩,默认为 true
    -deflate=true
    
    # 跟踪消息处理时间百分比,范围 (0, 1.0],多次指定使用 , 分隔,默认为 none
    -e2e-processing-latency-percentile='1.0,0.99,0.95'
    
    # 计算时间段内点对点时间延迟,默认为 10m0s
    -e2e-processing-latency-window-time=60s
    
    # http 客户端监听地址,默认为 0.0.0.0:4151
    -http-address=<addr>:<port>
    
    # http 客户端连接超时时间,默认为 2s
    -http-client-connect-timeout=2s
    
    # http 客户端请求超时时间,默认为 5s
    -http-client-request-timeout=5s
    
    # https 客户端监听地址,默认为 0.0.0.0:4152
    -https-address=<addr>:<port>
    
    # 日志级别,可选级别有 debug、info、warn、error、fatal,默认为 info
    -log-level=info
    
    # 日志消息前缀,默认为 '[nsqd] '
    -log-prefix='[nsqd] '
    
    # lookupd 的 tcp 地址,可以为多个
    -lookupd-tcp-address=<addr>:<port>
    
    # 单条命令最大的正文大小,默认为 5242880
    -max-body-size=5242880
    
    # 每个磁盘队列文件的最大字节数,默认为 104857600
    -max-bytes-per-file=104857600
    
    # 每个 nsqd 实例的最大 channel 消费者连接数,默认为 0,即无限制
    -max-channel-consumers=0
    
    # 客户端最大协商压缩等级,值越大 CPU 占用越高,默认为 6
    -max-deflate-level=6
    
    # 客户端间最大心跳时间间隔,默认为 1m0s
    -max-heartbeat-interval=1m0s
    
    # 单条消息最大字节数,默认为 1048576
    -max-msg-size=1048576
    
    # 消息的最大超时时间,默认为 15m0s
    -max-msg-timeout=15m0s
    
    # 客户端输出缓冲区的最大字节数,默认为 65536
    -max-output-buffer-size=65536
    
    #  缓存数据到达客户端的最大时间段,默认为 30s
    -max-output-buffer-timeout=30s
    
    #  缓存数据到达客户端的最小时间段,默认为 25ms
    -min-output-buffer-timeout=25ms
    
    # 缓存数据到达客户端的时间段,默认为 250ms
    -output-buffer-timeout=250ms
    
    # 客户端最大 RDY 数量,默认为 2500
    -max-rdy-count=2500
    
    # 消息重新排队的最大超时时间,默认为 1h0m0s
    -max-req-timeout=1h0m0s
    
    # 保留在内存中的消息数(每个主题/通道),默认为 10000
    -mem-queue-size=10000
    
    # 自动请求消息需要等待时间,默认为 1m0s
    -msg-timeout=1m0s
    
    # 消息 ID 的唯一部分,范围为 [0,1024)
    -node-id=559
    
    # 启用客户端快速协商压缩功能,默认为 true
    -snappy=true
    
    # 用于推送统计信息的 statsd 守护程序的 UDP 地址
    -statsd-address=<addr>:<port>
    
    # 推送到 statsd 时间间隔,默认为 1m0s
    -statsd-interval=1m0s
    
    # 开启发送内存和 GC 统计信息到 statsd,默认为 true
    -statsd-mem-stats=true
    
    # 用于发送给 statsd 的密钥的前缀,使用 %s 代表主机,默认为 'nsq.%s'
    -statsd-prefix='nsq.%s'
    
    # statsd UDP 数据包字节大小,默认为 508
    -statsd-udp-packet-size=508
    
    # 每个磁盘队列 fsync 的消息数 ,默认为 2500
    -sync-every=2500
    
    # 每个磁盘队列 fsync 的持续时间,默认为 2s
    -sync-timeout=2s
    
    # 用于和 tcp 客户端通信的监听地址,默认为 0.0.0.0:4150
    -tcp-address=<addr>:<port>
    
    # 证书 cer 文件路径
    -tls-cert=<path>
    
    # 客户端证书授权策略,'require' 或 'require-verify'
    -tls-client-auth-policy=
    
    # 证书私钥 key 文件路径
    -tls-key=<path>
    
    # 可接受的最低 SSL/TLS 版本,'ssl3.0'、'tls1.0'、'tls1.1' 或 'tls1.2'
    -tls-min-version=
    
    # 是否要求 TLS 进行客户端连接,true、false 或 tcp-https
    -tls-required=
    
    # ca 文件路径
    -tls-root-ca-file=<path>
    
    # 已弃用,使用 --log-level 代替
    -verbose
    
    # 输出版本号
    -version
    
    # 已弃用,使用 --node-id 代替
    -worker-id

    启动

    nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160
    
    # 测试
    curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'


    nsq_to_file

    从 nsqlookupd 备份指定 topic 的消息。

    nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161


    nsqadmin

    https://nsq.io/components/nsqadmin.html

    nsqadmin 用来收集集群实时统计信息和管理任务的 WEB UI。

    命令参数

    # 允许 http 请求访问 /config 节点的 CIDR,默认为 127.0.0.1/8
    -allow-config-from-cidr='127.0.0.1/8'
    
    # 配置文件路径
    -config=<path>
    
    # graphite 的 http 地址
    -graphite-url=
    
    # 用于和 http 客户端通信的监听地址,默认为 "0.0.0.0:4171
    -http-address=<addr>:<port>
    
    # http 客户端连接超时时间,默认为 2s
    -http-client-connect-timeout=2s
    
    # http 客户端请求超时时间,默认为 5s
    -http-client-request-timeout=5s
    
    # 用于 http 客户端的证书 cer 文件路径
    -http-client-tls-cert=<path>
    
    # 用于 http 客户端的证书私钥 key 文件路径
    -http-client-tls-key=<path>
    
    # 用于 http 客户端的 ca 证书文件路径
    -http-client-tls-root-ca-file=<path>
    
    # 为 http 客户端配置跳过验证 tls 证书
    -http-client-tls-insecure-skip-verify
    
    # 日志消息前缀,默认为 
    -log-prefix='[nsqadmin] '
    
    # lookupd 的 http 地址,可以为多个
    -lookupd-http-address=
    
    # 发送管理员操作的 POST 通知到 http 端(完全限定)
    -notification-http-endpoint=
    
    # nsqd 的 http 地址,可以为多个
    -nsqd-http-address=<addr>:<port>
    
    # 代理HTTP请求到 graphite
    -proxy-graphite
    
    # 格式化 statsd-counter,如果不需要格式化请将其设置为空,默认为 stats.counters.%s.count
    -statsd-counter-format='stats.counters.%s.count'
    
    # 格式化 statsd-gauge,如果不需要格式化请将其设置为空,默认为 stats.gauges.%s
    -statsd-gauge-format='stats.gauges.%s'
    
    # 推送到 statsd 时间间隔,必须与 nsqd 配置一致,默认为 1m0s
    -statsd-interval=1m0s
    
    # 用于发送给 statsd 的密钥的前缀,必须与 nsqd 配置一致,使用 %s 代替主机,默认为 nsq.%s
    -statsd-prefix='nsq.%s'
    
    # 输出版本号
    -version

    启动

    nsqadmin --lookupd-http-address=127.0.0.1:4161


    部署

    使用 docker-compose 部署

    https://nsq.io/deployment/docker.html

    version: '3'
    services:
      nsqlookupd:
        image: nsqio/nsq
        command: /nsqlookupd
        ports:
          - "4160:4160"
          - "4161:4161"
        networks:
          nsq:
            ipv4_address: 10.0.10.2
        container_name: nsqlookupd
      nsqd:
        image: nsqio/nsq
        command: /nsqd --broadcast-address=10.0.10.3 --lookupd-tcp-address=nsqlookupd:4160
        depends_on:
          - nsqlookupd
        ports:
          - "4150:4150"
          - "4151:4151"
        networks:
          nsq:
            ipv4_address: 10.0.10.3
        container_name: nsqd
      nsq_to_file:
        image: nsqio/nsq
        command: /nsq_to_file --topic=test --lookupd-http-address=nsqlookupd:4161
        depends_on:
          - nsqlookupd
        container_name: nsq_to_file
        networks:
          nsq:
            ipv4_address: 10.0.10.4
        container_name: nsq_to_file
      nsqadmin:
        image: nsqio/nsq
        command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
        depends_on:
          - nsqlookupd  
        ports:
          - "4171:4171"
        networks:
          nsq:
            ipv4_address: 10.0.10.5
        container_name: nsqadmin
    
    networks:
      nsq:
        driver: bridge
        ipam:
          config:
            - subnet: 10.0.10.0/16


    使用

    生产者

    package main
    
    import (
        "github.com/nsqio/go-nsq"
        "log"
        "net"
        "os"
    )
    
    var logger = log.New(os.Stdout, "[producer] ", log.LstdFlags)
    
    func main() {
        var err error
        config := nsq.NewConfig()
        config.LocalAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:0")
        if err != nil {
            log.Fatalln(err)
        }
    
        w, err := nsq.NewProducer("127.0.0.1:4150", config)
        if err != nil {
            log.Fatalln(err)
        }
        w.SetLogger(logger, nsq.LogLevelInfo)
        err = w.Ping()
        if err != nil {
            log.Fatalln(err)
        }
    
        err = w.Publish("news_push", []byte("hello"))
        if err != nil {
            log.Fatalln(err)
        }
        w.Stop()
    }

    消费者

    如果使用 docker 搭建测试环境,且使用 lookupd 来查询 nsqd 很可能会出现如下错误:

    error connecting to nsqd - dial tcp: lookup 4d846ab34d0d: Temporary failure in name resolution

    出现这个问题原因如下:

    启动 nsqd 时未使用 -broadcast-address 指定地址,默认为容器主机名,lookupd 查询时返回该主机名,然而在宿主机里并不能解析出对应 ip,需要在宿主机修改 hosts 记录或指定 -broadcast-address 为 nsqd 的主机 ip,注意不要修改为 127.0.0.1 ,这将导致处于不同容器 nsqadmin 去根据此 ip 访问容器本地,会使相关页面失效;另一种方法是使用 docker-compose 指定每个容器的 ip。

    package main
    
    import (
        "errors"
        "github.com/nsqio/go-nsq"
        "log"
        "os"
        "time"
    )
    
    var logger = log.New(os.Stdout, "[consumer] ", log.LstdFlags)
    
    type Handler struct{}
    
    func (h *Handler) HandleMessage(msg *nsq.Message) error {
        if string(msg.Body) == "TOBEFAILED" {
            log.Println("failed")
    
            return errors.New("failed")
        }
    
        log.Println(msg.NSQDAddress, string(msg.Body))
    
        return nil
    }
    
    func main() {
        var err error
        config := nsq.NewConfig()
        config.DefaultRequeueDelay = 0
        config.MaxBackoffDuration = time.Microsecond * 50
    
        cs, err := nsq.NewConsumer("news_push", "ch", config)
        if err != nil {
            log.Fatalln(err)
        }
        cs.SetLogger(logger, nsq.LogLevelInfo)
        cs.AddHandler(&Handler{})
    
        // 直接连接 nsqd
        err = cs.ConnectToNSQD("127.0.0.1:4150")
    
        // 通过查询获取 nsqd 地址并连接
        // err = cs.ConnectToNSQLookupd("127.0.0.1:4161")
        if err != nil {
            log.Fatalln(err)
        }
    
        stat := cs.Stats()
        if stat.Connections == 0 {
            log.Fatalln("0 connection")
        }
    
        // err = cs.DisconnectFromNSQD("127.0.0.1:4150")
        // err = cs.DisconnectFromNSQLookupd("127.0.0.1:4161")
        // if err != nil {
        //     log.Fatalln(err)
        // }
    
        <-cs.StopChan
    }


    本文标题:nsq 在 golang 中使用
    本文链接:https://www.lanseyujie.com/post/application-of-nsq-in-golang.html
    版权声明:本文使用「署名-非商业性使用-相同方式共享」创作共享协议,转载或使用请遵守署名协议。
    点赞 0 分享 0