nsqlookupd
https://nsq.io/components/nsqlookupd.html
nsqlookupd 是管理拓扑信息的守护程序。客户端通过查询 nsqlookupd 以发现指定 topic 的 nsqd 生产者,并且 nsqd 节点广播 topic 和 channel。
命令参数
# lookupd 节点的外部地址,默认为主机名 -broadcast-address=<host> # 配置文件路径 -config=<path> # 用于 http 客户端通信的监听地址,默认为 0.0.0.0:4161 -http-address=<addr>:<port> # 生产者自上次 ping 保留在活动列表中的持续时间,默认为 5m0s -inactive-producer-timeout=5m0s # 日志消息前缀,默认为 '[nsqlookupd] ' -log-prefix='[nsqlookupd] ' # 用于 tcp 客户端通信的监听地址,默认为 0.0.0.0:4160 -tcp-address=<addr>:<port> # 生产者离线后保持注册名状态的持续时间 -tombstone-lifetime=45s # 开启详细日志 -verbose # 输出版本号 -version
启动
nsqlookupd # 测试 curl http://127.0.0.1:4161/ping
nsqd
https://nsq.io/components/nsqd.html
nsqd 是接收、排队并将消息转发到客户端的守护程序。
一个 nsqd 可以创建多个 topic,每个 topic 有多个 channel,每个 channel 都能获取该 topic 的所有消息的副本。topic 和 channel 都相互独立地缓冲数据,从而防止缓慢的消费者造成 channel 的积压(在主题级别也是如此)。一个 channel 可以连接多个消费者客户端,但每条消息会随机发送其中一个消费者。
命令参数
# 认证服务器地址,可以为多个 -auth-http-address=<addr>:<port> # 向 lookupd 注册当前 nsqd 节点的外部地址,默认为主机名 -broadcast-address=<host> # 配置文件路径 -config=<config-file> # 消息备份目录 -data-path=<path> # 是否启用客户端协商压缩,默认为 true -deflate=true # 跟踪消息处理时间百分比,范围 (0, 1.0],多次指定使用 , 分隔,默认为 none -e2e-processing-latency-percentile='1.0,0.99,0.95' # 计算时间段内点对点时间延迟,默认为 10m0s -e2e-processing-latency-window-time=60s # http 客户端监听地址,默认为 0.0.0.0:4151 -http-address=<addr>:<port> # http 客户端连接超时时间,默认为 2s -http-client-connect-timeout=2s # http 客户端请求超时时间,默认为 5s -http-client-request-timeout=5s # https 客户端监听地址,默认为 0.0.0.0:4152 -https-address=<addr>:<port> # 日志级别,可选级别有 debug、info、warn、error、fatal,默认为 info -log-level=info # 日志消息前缀,默认为 '[nsqd] ' -log-prefix='[nsqd] ' # lookupd 的 tcp 地址,可以为多个 -lookupd-tcp-address=<addr>:<port> # 单条命令最大的正文大小,默认为 5242880 -max-body-size=5242880 # 每个磁盘队列文件的最大字节数,默认为 104857600 -max-bytes-per-file=104857600 # 每个 nsqd 实例的最大 channel 消费者连接数,默认为 0,即无限制 -max-channel-consumers=0 # 客户端最大协商压缩等级,值越大 CPU 占用越高,默认为 6 -max-deflate-level=6 # 客户端间最大心跳时间间隔,默认为 1m0s -max-heartbeat-interval=1m0s # 单条消息最大字节数,默认为 1048576 -max-msg-size=1048576 # 消息的最大超时时间,默认为 15m0s -max-msg-timeout=15m0s # 客户端输出缓冲区的最大字节数,默认为 65536 -max-output-buffer-size=65536 # 缓存数据到达客户端的最大时间段,默认为 30s -max-output-buffer-timeout=30s # 缓存数据到达客户端的最小时间段,默认为 25ms -min-output-buffer-timeout=25ms # 缓存数据到达客户端的时间段,默认为 250ms -output-buffer-timeout=250ms # 客户端最大 RDY 数量,默认为 2500 -max-rdy-count=2500 # 消息重新排队的最大超时时间,默认为 1h0m0s -max-req-timeout=1h0m0s # 保留在内存中的消息数(每个主题/通道),默认为 10000 -mem-queue-size=10000 # 自动请求消息需要等待时间,默认为 1m0s -msg-timeout=1m0s # 消息 ID 的唯一部分,范围为 [0,1024) -node-id=559 # 启用客户端快速协商压缩功能,默认为 true -snappy=true # 用于推送统计信息的 statsd 守护程序的 UDP 地址 -statsd-address=<addr>:<port> # 推送到 statsd 时间间隔,默认为 1m0s -statsd-interval=1m0s # 开启发送内存和 GC 统计信息到 statsd,默认为 true -statsd-mem-stats=true # 用于发送给 statsd 的密钥的前缀,使用 %s 代表主机,默认为 'nsq.%s' -statsd-prefix='nsq.%s' # statsd UDP 数据包字节大小,默认为 508 -statsd-udp-packet-size=508 # 每个磁盘队列 fsync 的消息数 ,默认为 2500 -sync-every=2500 # 每个磁盘队列 fsync 的持续时间,默认为 2s -sync-timeout=2s # 用于和 tcp 客户端通信的监听地址,默认为 0.0.0.0:4150 -tcp-address=<addr>:<port> # 证书 cer 文件路径 -tls-cert=<path> # 客户端证书授权策略,'require' 或 'require-verify' -tls-client-auth-policy= # 证书私钥 key 文件路径 -tls-key=<path> # 可接受的最低 SSL/TLS 版本,'ssl3.0'、'tls1.0'、'tls1.1' 或 'tls1.2' -tls-min-version= # 是否要求 TLS 进行客户端连接,true、false 或 tcp-https -tls-required= # ca 文件路径 -tls-root-ca-file=<path> # 已弃用,使用 --log-level 代替 -verbose # 输出版本号 -version # 已弃用,使用 --node-id 代替 -worker-id
启动
nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160 # 测试 curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
nsq_to_file
从 nsqlookupd 备份指定 topic 的消息。
nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
nsqadmin
https://nsq.io/components/nsqadmin.html
nsqadmin 用来收集集群实时统计信息和管理任务的 WEB UI。
命令参数
# 允许 http 请求访问 /config 节点的 CIDR,默认为 127.0.0.1/8 -allow-config-from-cidr='127.0.0.1/8' # 配置文件路径 -config=<path> # graphite 的 http 地址 -graphite-url= # 用于和 http 客户端通信的监听地址,默认为 "0.0.0.0:4171 -http-address=<addr>:<port> # http 客户端连接超时时间,默认为 2s -http-client-connect-timeout=2s # http 客户端请求超时时间,默认为 5s -http-client-request-timeout=5s # 用于 http 客户端的证书 cer 文件路径 -http-client-tls-cert=<path> # 用于 http 客户端的证书私钥 key 文件路径 -http-client-tls-key=<path> # 用于 http 客户端的 ca 证书文件路径 -http-client-tls-root-ca-file=<path> # 为 http 客户端配置跳过验证 tls 证书 -http-client-tls-insecure-skip-verify # 日志消息前缀,默认为 -log-prefix='[nsqadmin] ' # lookupd 的 http 地址,可以为多个 -lookupd-http-address= # 发送管理员操作的 POST 通知到 http 端(完全限定) -notification-http-endpoint= # nsqd 的 http 地址,可以为多个 -nsqd-http-address=<addr>:<port> # 代理HTTP请求到 graphite -proxy-graphite # 格式化 statsd-counter,如果不需要格式化请将其设置为空,默认为 stats.counters.%s.count -statsd-counter-format='stats.counters.%s.count' # 格式化 statsd-gauge,如果不需要格式化请将其设置为空,默认为 stats.gauges.%s -statsd-gauge-format='stats.gauges.%s' # 推送到 statsd 时间间隔,必须与 nsqd 配置一致,默认为 1m0s -statsd-interval=1m0s # 用于发送给 statsd 的密钥的前缀,必须与 nsqd 配置一致,使用 %s 代替主机,默认为 nsq.%s -statsd-prefix='nsq.%s' # 输出版本号 -version
启动
nsqadmin --lookupd-http-address=127.0.0.1:4161
部署
使用 docker-compose 部署
https://nsq.io/deployment/docker.html
version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160:4160" - "4161:4161" networks: nsq: ipv4_address: 10.0.10.2 container_name: nsqlookupd nsqd: image: nsqio/nsq command: /nsqd --broadcast-address=10.0.10.3 --lookupd-tcp-address=nsqlookupd:4160 depends_on: - nsqlookupd ports: - "4150:4150" - "4151:4151" networks: nsq: ipv4_address: 10.0.10.3 container_name: nsqd nsq_to_file: image: nsqio/nsq command: /nsq_to_file --topic=test --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd container_name: nsq_to_file networks: nsq: ipv4_address: 10.0.10.4 container_name: nsq_to_file nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "4171:4171" networks: nsq: ipv4_address: 10.0.10.5 container_name: nsqadmin networks: nsq: driver: bridge ipam: config: - subnet: 10.0.10.0/16
使用
生产者
package main import ( "github.com/nsqio/go-nsq" "log" "net" "os" ) var logger = log.New(os.Stdout, "[producer] ", log.LstdFlags) func main() { var err error config := nsq.NewConfig() config.LocalAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:0") if err != nil { log.Fatalln(err) } w, err := nsq.NewProducer("127.0.0.1:4150", config) if err != nil { log.Fatalln(err) } w.SetLogger(logger, nsq.LogLevelInfo) err = w.Ping() if err != nil { log.Fatalln(err) } err = w.Publish("news_push", []byte("hello")) if err != nil { log.Fatalln(err) } w.Stop() }
消费者
如果使用 docker 搭建测试环境,且使用 lookupd 来查询 nsqd 很可能会出现如下错误:
error connecting to nsqd - dial tcp: lookup 4d846ab34d0d: Temporary failure in name resolution
出现这个问题原因如下:
启动 nsqd 时未使用 -broadcast-address 指定地址,默认为容器主机名,lookupd 查询时返回该主机名,然而在宿主机里并不能解析出对应 ip,需要在宿主机修改 hosts 记录或指定 -broadcast-address 为 nsqd 的主机 ip,注意不要修改为 127.0.0.1 ,这将导致处于不同容器 nsqadmin 去根据此 ip 访问容器本地,会使相关页面失效;另一种方法是使用 docker-compose 指定每个容器的 ip。
package main import ( "errors" "github.com/nsqio/go-nsq" "log" "os" "time" ) var logger = log.New(os.Stdout, "[consumer] ", log.LstdFlags) type Handler struct{} func (h *Handler) HandleMessage(msg *nsq.Message) error { if string(msg.Body) == "TOBEFAILED" { log.Println("failed") return errors.New("failed") } log.Println(msg.NSQDAddress, string(msg.Body)) return nil } func main() { var err error config := nsq.NewConfig() config.DefaultRequeueDelay = 0 config.MaxBackoffDuration = time.Microsecond * 50 cs, err := nsq.NewConsumer("news_push", "ch", config) if err != nil { log.Fatalln(err) } cs.SetLogger(logger, nsq.LogLevelInfo) cs.AddHandler(&Handler{}) // 直接连接 nsqd err = cs.ConnectToNSQD("127.0.0.1:4150") // 通过查询获取 nsqd 地址并连接 // err = cs.ConnectToNSQLookupd("127.0.0.1:4161") if err != nil { log.Fatalln(err) } stat := cs.Stats() if stat.Connections == 0 { log.Fatalln("0 connection") } // err = cs.DisconnectFromNSQD("127.0.0.1:4150") // err = cs.DisconnectFromNSQLookupd("127.0.0.1:4161") // if err != nil { // log.Fatalln(err) // } <-cs.StopChan }