跳转到内容
View in the app

A better way to browse. Learn more.

彼岸论坛

A full-screen app on your home screen with push notifications, badges and more.

To install this app on iOS and iPadOS
  1. Tap the Share icon in Safari
  2. Scroll the menu and tap Add to Home Screen.
  3. Tap Add in the top-right corner.
To install this app on Android
  1. Tap the 3-dot menu (⋮) in the top-right corner of the browser.
  2. Tap Add to Home screen or Install app.
  3. Confirm by tapping Install.
欢迎抵达彼岸 彼岸花开 此处谁在 -彼岸论坛

[程序员] 求助: GoLang MQTT 客户端使用问题

发表于

我遇到的问题:

2024-08-29 07:13:10	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_5 , error = pingresp not received, disconnecting
2024-08-29 07:13:14	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_88 , error = pingresp not received, disconnecting
2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_43 , error = pingresp not received, disconnecting
2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_72 , error = pingresp not received, disconnecting
2024-08-29 07:13:15	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_1 , error = pingresp not received, disconnecting
2024-08-29 07:13:17	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_37 , error = pingresp not received, disconnecting
2024-08-29 07:13:18	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_10 , error = pingresp not received, disconnecting
2024-08-29 07:14:13	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_52 , error = pingresp not received, disconnecting
2024-08-29 07:14:18	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_59 , error = pingresp not received, disconnecting
2024-08-29 07:14:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_84 , error = pingresp not received, disconnecting
2024-08-29 07:14:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_54 , error = pingresp not received, disconnecting
2024-08-29 07:14:21	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_22 , error = pingresp not received, disconnecting
2024-08-29 07:14:22	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_12 , error = pingresp not received, disconnecting
2024-08-29 07:14:23	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_25 , error = pingresp not received, disconnecting
2024-08-29 07:14:24	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:14:26	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_36 , error = pingresp not received, disconnecting
2024-08-29 07:15:08	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_63 , error = pingresp not received, disconnecting
2024-08-29 07:15:16	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_23 , error = pingresp not received, disconnecting
2024-08-29 07:15:19	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_96 , error = pingresp not received, disconnecting
2024-08-29 07:15:20	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_50 , error = pingresp not received, disconnecting
2024-08-29 07:15:25	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_75 , error = pingresp not received, disconnecting
2024-08-29 07:15:30	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_78 , error = pingresp not received, disconnecting
2024-08-29 07:15:36	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_7 , error = pingresp not received, disconnecting
2024-08-29 07:15:39	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting
2024-08-29 07:16:17	error	go-iot/mqtt_service.go:40	mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting

这是我正在使用的程序代码

package main

import (
	"encoding/json"
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"go.uber.org/zap"
	"sync"
	"time"
)

// MqttInterface 定义了 MQTT 客户端的基本接口
type MqttInterface struct {
	client mqtt.Client
	Id     string
	Chan   chan []byte
	Config MqttConfig
	wg     sync.WaitGroup
}

// NewMqttClient 初始化并返回一个新的 MqttInterface 实例
func NewMqttClient(id string, config MqttConfig) *MqttInterface {
	return &MqttInterface{
		Id:     id,
		Chan:   make(chan []byte, 1000),
		Config: config,
	}
}

// Connect 连接到 MQTT 服务器
func (m *MqttInterface) Connect(host, username, password string, port int) error {
	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port))
	opts.SetUsername(username)
	opts.SetAutoReconnect(false)
	opts.SetPassword(password)
	opts.SetClientID(m.Id)
	//opts.SetDefaultPublishHandler(m.messageHandler)
	opts.OnConnectionLost = func(client mqtt.Client, err error) {
		zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err)
		StopMqttClient(m.Id, m.Config)
	}

	opts.SetOrderMatters(false)
	opts.SetKeepAlive(60 * time.Second)
	// 创建并启动客户端
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		return token.Error()
	}

	m.client = client
	return nil
}

// messageHandler 处理接收到的消息
func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) {

}

// Subscribe 订阅一个或多个主题
func (m *MqttInterface) Subscribe(topics string) error {
	var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
		m.wg.Add(1)
		defer func() {
			m.wg.Done()
			//zap.S().Errorf("mqtt subscribe id = %s , topic = %s", m.Id, msg.Topic())
		}()
		mqttMsg := MQTTMessage{
			MQTTClientID: m.Id,
			Message:      string(msg.Payload()),
		}
		jsonData, _ := json.Marshal(mqttMsg)

		m.Chan <- jsonData

	})

	if token.Wait() && token.Error() != nil {
		zap.S().Errorf(token.Error().Error())
		return token.Error()
	}
	return nil
}

// Publish 向一个主题发布消息
func (m *MqttInterface) Publish(topic string, payload interface{}) {
	token := m.client.Publish(topic, 0, false, payload)
	token.Wait()
}

// Disconnect 断开与 MQTT 服务器的连接
func (m *MqttInterface) Disconnect() {
	m.client.Disconnect(250)
}

func (m *MqttInterface) HandlerMsg() {
	for {
		c := <-m.Chan
		PushToQueue("pre_handler", c)

	}
}

创建 MQTT 客户端和开启订阅

	client := NewMqttClient(clientId,config)
	err := client.Connect(broker, username, password, port)
	if err != nil {
		zap.S().Errorf("mqtt connect err = %v", err)
        return false
	}
	go client.Subscribe(subTopic)
	go client.HandlerMsg()
    

请问这个问题应该如何解决。

我的尝试

  1. 我发起了一个 Issues ,我理解是让消息接收后进行异步处理 https://github.com/eclipse/paho.mqtt.golang/issues/686

  2. 修改程序如下

	var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) {
		go func() {
			mqttMsg := MQTTMessage{
				MQTTClientID: m.Id,
				Message:      string(msg.Payload()),
			}
			jsonData, _ := json.Marshal(mqttMsg)

			m.Chan <- jsonData
		}()

	})

上述两个操作均没有得到正常处理。请问应当如何解决这个问题。

Featured Replies

No posts to show

创建帐户或登录来提出意见

Configure browser push notifications

Chrome (Android)
  1. Tap the lock icon next to the address bar.
  2. Tap Permissions → Notifications.
  3. Adjust your preference.
Chrome (Desktop)
  1. Click the padlock icon in the address bar.
  2. Select Site settings.
  3. Find Notifications and adjust your preference.