NSQ with Go 初体验
May 17, 2022
NSQ 初尝
官方:https://nsq.io/overview/design.html
Github:https://github.com/nsqio/nsq
安装
brew install nsq
使用 nsqlookupd
运行起来
docker 版本
version: '2'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- '4160:4160'
- '4161:4161'
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --broadcast-address=nsqd
ports:
- '4150:4150'
- '4151:4151'
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
ports:
- '4171:4171'
需要再etc/hosts
文件中,将nsqd
指向到127.0.0.1
生产者
var producer *nsq.Producer
producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
if err != nil {
panic(err)
}
err = producer.Ping()
if err != nil {
// 关闭生产者
producer.Stop()
producer = nil
panic(err)
}
fmt.Println("ping queue success")
topic := "test"
count := 0
for i := 0; i < 1000; i++ {
message := fmt.Sprintf("message:%d", count)
if producer != nil && message != "" {
err = producer.Publish(topic, []byte(message))
if err != nil {
fmt.Println(err)
}
fmt.Println(message)
}
//time.Sleep(time.Second)
count++
}
消费者
package queue
import (
"fmt"
"github.com/nsqio/go-nsq"
"time"
)
type (
Consumer struct{}
)
func (c *Consumer) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func NewConsumer(topic string, channel string) {
cfg := nsq.NewConfig()
cfg.LookupdPollTimeout = time.Second
c, err := nsq.NewConsumer(topic, channel, cfg)
if err != nil {
panic(err)
}
c.AddHandler(&Consumer{})
//建立NSQLookupd连接
if err := c.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
panic(err)
}
<-c.StopChan
}
消息状态
- FIN
FIN 命令是最简单的,它用于告诉nsqd
它可以安全的丢弃这条消息,你可以使用FIN
用于丢弃你不想处理、重试的消息
- REQ
REQ 命令用于告诉nsqd
消息应该重新排队,
基于 NSQ 的队列实现
生产
package queue
import (
"antmons/queue/connection"
"antmons/queue/exception"
"encoding/json"
"github.com/nsqio/go-nsq"
"log"
"reflect"
"runtime"
"strings"
)
var (
NSQDConfig connection.NSQConfig = connection.NSQConfig{
Host: "127.0.0.1",
Port: "4150",
}
NSQLookupConfig connection.NSQConfig = connection.NSQConfig{
Host: "127.0.0.1",
Port: "4161",
}
)
type PayloadData struct {
CommandName string
CommandData string
}
type Payload struct {
Queue string
Job string
Delay uint16
Timeout int
MaxAttempt uint16
Data PayloadData
}
type DispatchOptions struct {
Command interface{} //handle方法名,不指定则默认为Handle;可为Func类型、或string类型
Params any // 参数
Queue string // 队列
MaxAttempt uint16 // 最大重试次数
Timeout int //任务处理时超时取消时长、单位秒。默认 0 、不超时。
}
type Dispatchable interface {
Dispatch(v any, options *DispatchOptions) error
DispatchWithDelay(v any, options *DispatchOptions, delay int) error
}
type NSQueue struct {
topic string
producer *nsq.Producer
}
func NewNSQueue(topic string) *NSQueue {
producer, err := connection.NewProducer(NSQDConfig)
if err != nil {
log.Println(err)
return nil
}
q := &NSQueue{
topic: topic,
producer: producer,
}
return q
}
func (q *NSQueue) Dispatch(opt *DispatchOptions, delay uint16) error {
var cmdName string
var cmdFullName string
if opt.MaxAttempt > connection.MaxAttempts {
return exception.MaxAttemptsOverflow
}
if opt.Command != nil {
if fn, ok := opt.Command.(string); ok {
cmdName = fn
} else {
n := runtime.FuncForPC(reflect.ValueOf(opt.Command).Pointer()).Name()
s1 := strings.Split(n, "-")
s2 := strings.Split(s1[0], ".")
cmdFullName = n
cmdName = s2[len(s2)-1]
}
}
pStr, _ := json.Marshal(opt.Params)
pay := Payload{
Job: cmdFullName,
Delay: delay,
Timeout: opt.Timeout,
MaxAttempt: opt.MaxAttempt,
Queue: opt.Queue,
Data: PayloadData{
CommandName: cmdName,
CommandData: string(pStr),
},
}
payStr, err := json.Marshal(pay)
if err != nil {
return err
}
return connection.RegisterMessage(q.producer, q.topic, payStr, delay)
}
package connection
import (
"fmt"
"github.com/nsqio/go-nsq"
"time"
)
type (
NSQConfig struct {
Host string
Port string
}
)
var MaxAttempts uint16 = 10
func NewProducer(c NSQConfig) (*nsq.Producer, error) {
nCfg := nsq.NewConfig()
producer, err := nsq.NewProducer(fmt.Sprintf("%s:%s", c.Host, c.Port), nCfg)
if err != nil {
return nil, err
}
err = producer.Ping()
if err != nil {
return nil, err
}
return producer, nil
}
func NewConsumer(topic string, channel string) (*nsq.Consumer, error) {
cfg := nsq.NewConfig()
cfg.LookupdPollTimeout = time.Second
cfg.MaxAttempts = MaxAttempts
consumer, err := nsq.NewConsumer(topic, channel, cfg)
if err != nil {
return nil, err
}
return consumer, nil
}
func RegisterMessage(producer *nsq.Producer, topic string, payload []byte, delay uint16) error {
if delay > 0 {
producer.DeferredPublish(topic, time.Second*time.Duration(delay), payload)
} else {
producer.Publish(topic, payload)
}
return nil
}
使用案例:
// 将发送邮件任务推送至队列
q := queue.NewNSQueue("email")
err := q.Dispatch(&queue.DispatchOptions{
MaxAttempt: 3,
Command: job.SendRegisterEMail,
Timeout: 10,
Queue: "topic",
Params: map[string]any{"code": code, "to": to},
}, 0)
消费
package main
import (
"antmons/config"
"antmons/job"
"antmons/queue"
"antmons/queue/connection"
"antmons/repository"
"encoding/json"
"errors"
"fmt"
"github.com/fatih/color"
"github.com/joho/godotenv"
"github.com/nsqio/go-nsq"
"github.com/spf13/cobra"
"reflect"
"time"
)
func main() {
rootCmd := &cobra.Command{}
rootCmd.AddCommand(
CommandQueueStart,
)
rootCmd.Execute()
}
type (
Consumer struct{}
)
var queues = make(map[string]any, 0)
func (c *Consumer) HandleMessage(msg *nsq.Message) error {
payload := &queue.Payload{}
err := json.Unmarshal(msg.Body, payload)
if err != nil {
return err
}
if msg.Attempts >= payload.MaxAttempt {
msg.Finish()
}
doneChan := make(chan error, 1)
timeout := time.Duration(5) * time.Minute
if payload.Timeout > 0 {
timeout = time.Duration(payload.Timeout) * time.Minute
}
go handleJob(payload, doneChan)
select {
case err := <-doneChan:
if err != nil {
db := repository.GetSharedDB()
fJob := &repository.FailedJob{
MessageID: string(msg.ID[:]),
Payload: msg.Body,
Queue: payload.Queue,
Exception: err.Error(),
FailedAt: time.Now(),
}
db.Model(&repository.FailedJob{}).Where("message_id = ?", fJob.MessageID).Where("queue = ?", fJob.Queue).FirstOrCreate(fJob)
if msg.Attempts < payload.MaxAttempt {
msg.Requeue(time.Second)
}
return errors.New("exec failed")
}
close(doneChan)
return nil
case <-time.After(timeout):
close(doneChan)
return errors.New("time out")
}
}
func handleJob(payload *queue.Payload, doneCh chan error) {
fn := reflect.ValueOf(queues[payload.Data.CommandName])
if fn.Kind() != reflect.Func {
doneCh <- errors.New("function no found")
return
}
params := make([]reflect.Value, 1) // 参数
params[0] = reflect.ValueOf(payload.Data.CommandData)
rs := fn.Call(params)
if err, ok := rs[0].Interface().(error); ok {
doneCh <- err
} else {
doneCh <- nil
}
}
func RegisterJob(name string, fn any) {
queues[name] = fn
}
var CommandQueueStart = &cobra.Command{
Use: "queue:start queue_name",
Short: "start a job queue",
Long: "this command is used to start the job queue from command line",
Args: cobra.MinimumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
if len(args) < 1 {
cmd.Println("【错误】必须输入待执行的队列名, 例如: queue:start search")
return
}
//加载环境
//加载 .env 文件
err := godotenv.Load()
if err != nil {
panic(err)
}
color.Blue("「.env」 文件加载成功!")
//加载全局变量
config.InitVariable()
color.Blue("「全局配置变量」 加载成功!")
//注册队列
RegisterJob("SendRegisterEMail", job.SendRegisterEMail)
topic := args[0]
c, err := connection.NewConsumer(topic, "command")
if err != nil {
panic(err)
}
c.AddHandler(&Consumer{})
//建立NSQLookUpd连接
addr := fmt.Sprintf("%s:%s", queue.NSQLookupConfig.Host, queue.NSQLookupConfig.Port)
if err := c.ConnectToNSQLookupd(addr); err != nil {
panic(err)
}
<-c.StopChan
fmt.Println("all close")
},
}
这里使用的是命令行去消费队列中的消息,使用案例:
$ go run cmd.go queue:start email