acautomaton
acautomaton
发布于 2023-06-24 / 40 阅读
0
0

基于 Springboot 3.0.x 使用MQTT协议连接到EMQX服务器

一、引入依赖

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>6.0.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>6.0.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
            <version>3.0.6</version>
        </dependency>

二、构造配置类

@Configuration
@Data
public class MqttConfig {
    @Value("${mqtt.host}")
    private String Host;  //emqx服务器的协议地址, 如tcp://localhost:1883
    @Value("${mqtt.clientid}")
    private String ClientId;  //连接到emqx服务器时的clientId, 可自定义但不能重复
    @Value("${mqtt.username}")
    private String Username;  //登录到emqx时的用户名
    @Value("${mqtt.password}")
    private String Password;  //登录到emqx时的密码
    @Value("${mqtt.topic}")
    private String Topic;  //连接到emqx时默认订阅的主题, 可缺省
    @Value("${mqtt.timeout}")
    private Integer Timeout;  //超时时间
    @Value("${mqtt.keepalive}")
    private Integer Keepalive;  //心跳时间
    @Value("${mqtt.connectionTimeout}")
    private Integer ConnectionTimeout;  //连接超时时间
    @Value("${mqtt.cleansession}")
    private Boolean Cleansession;  //与emqx服务器断开连接后是否保留session, 设置为false在重连时可获取掉线期间的消息(自动重新订阅需要额外配置)
}

三、建立连接

@Component
public class MqttConnectUtil {
    MqttConfig mqttPublishConfig;
    @Autowired
    public MqttConnectUtil(MqttConfig mqttPublishConfig) {
        this.mqttPublishConfig = mqttPublishConfig;
    }

    public MqttConnectOptions getOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(mqttPublishConfig.getCleansession());
        mqttConnectOptions.setUserName(mqttPublishConfig.getUsername());
        mqttConnectOptions.setPassword(mqttPublishConfig.getPassword().toCharArray());
        mqttConnectOptions.setConnectionTimeout(mqttPublishConfig.getConnectionTimeout());
        mqttConnectOptions.setKeepAliveInterval(mqttPublishConfig.getKeepalive());
        return mqttConnectOptions;
    }

    public MqttConnectOptions getOptions(MqttConnectOptions mqttConnectOptions) {
        mqttConnectOptions.setCleanSession(mqttPublishConfig.getCleansession());
        mqttConnectOptions.setUserName(mqttPublishConfig.getUsername());
        mqttConnectOptions.setPassword(mqttPublishConfig.getPassword().toCharArray());
        mqttConnectOptions.setConnectionTimeout(mqttPublishConfig.getConnectionTimeout());
        mqttConnectOptions.setKeepAliveInterval(mqttPublishConfig.getKeepalive());
        return mqttConnectOptions;
    }
}

四、定义回调函数

@Slf4j
@Service
public class CallbackServiceImpl implements MqttCallback {
    MqttServiceImpl mqttService;

    @Autowired
    public CallbackServiceImpl(MqttServiceImpl mqttService) {
        this.mqttService = mqttService;
    }

    @Override
    public void connectionLost(Throwable throwable) {  //当连接丢失时执行
        log.warn("--- Connection Paused, Trying ReConnect ---");
        mqttService.subscribeConnect();  //重新连接
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) {  //当收到消息时执行
        log.debug("收到消息主题: " + s + " Qos: " + mqttMessage.getQos() + " Payload: " + new String(mqttMessage.getPayload()) + " Id: " + mqttMessage.getId());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {   //当发出Qos等级不为0的消息时执行
        log.debug("Qos回调状态: " + iMqttDeliveryToken.isComplete() + " " + Arrays.toString(iMqttDeliveryToken.getTopics()));
    }
}

五、MqttService的构造

一般来说,需要构造以下六个方法:

public interface MqttService {
    MqttClient publishConnect();  //建立推送连接
    MqttClient subscribeConnect();  //建立订阅连接
    void sendMessage(MqttTopic mqttTopic, MqttMessage mqttMessage);  //发出消息
    void publish(String mqttTopic, String mqttPayload, int qos);  //调用sendMessage()发送消息
    void subscribe(String topic, int qos);  //订阅主题
    void unsubscribe(String topic);  //取消订阅主题
}

六、MqttService的实现

@Data
@Slf4j
@Service
public class MqttServiceImpl implements MqttService {
    private MqttClient subscribeClient;
    private MqttClient publishClient;

    private MqttTopic mqttTopic;
    private MqttMessage mqttMessage;

    MqttConnectUtil mqttConnectUtil;
    MqttConfig mqttConfig;

    @Autowired
    public MqttServiceImpl(MqttConnectUtil mqttConnectUtil, MqttConfig mqttConfig) {
        this.mqttConnectUtil = mqttConnectUtil;
        this.mqttConfig = mqttConfig;
    }

    @Override
    public MqttClient publishConnect() {
        try {
            if (publishClient == null) {  //推送连接为null时构造
                publishClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientId() + "_publish", new MemoryPersistence());
            }
            MqttConnectOptions mqttConnectOptions = mqttConnectUtil.getOptions();  //读取配置
            if (!publishClient.isConnected()) {
                publishClient.connect(mqttConnectOptions);
                log.debug("--- Publish Connect Success ---");
            }
            else {  //失败时重连
                publishClient.disconnect();
                publishClient.connect(mqttConnectUtil.getOptions(mqttConnectOptions));
                log.debug("--- Publish ReConnect Success ---");
            }
        }
        catch (MqttException e) {
            log.debug(e.toString());
            log.error("--- Publish Connect Error ---");
        }
        return publishClient;
    }

    @Override
    public MqttClient subscribeConnect() {
        try {
            if (subscribeClient == null) {  //订阅连接为null时构造
                subscribeClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientId() + "_subscribe", new MemoryPersistence());
                subscribeClient.setCallback(new CallbackServiceImpl(MqttServiceImpl.this));  //配置回调方法
            }
            MqttConnectOptions mqttConnectOptions = mqttConnectUtil.getOptions();  //读取配置
            if (!subscribeClient.isConnected()) {
                subscribeClient.connect(mqttConnectOptions);
                log.debug("--- Subscribe Subscribe Success ---");
            }
            else {  //失败时重连
                subscribeClient.disconnect();
                subscribeClient.connect(mqttConnectUtil.getOptions(mqttConnectOptions));
                log.debug("--- Subscribe Subscribe Success ---");
            }
        }
        catch (MqttException e) {
            log.debug(e.toString());
            log.error("--- Subscribe Connect Error ---");
        }
        return subscribeClient;
    }

    @Override
    public void sendMessage(MqttTopic mqttTopic, MqttMessage mqttMessage) {  //发出消息
        try {
            MqttDeliveryToken mqttDeliveryToken = mqttTopic.publish(mqttMessage);  //推送消息, 返回一个MqttDeliveryToken状态, 便于了解发送状态
            mqttDeliveryToken.waitForCompletion();  //等待发送完成

            if (mqttDeliveryToken.isComplete()) {
                log.debug("给主题: " + mqttTopic.getName() + " 发送消息: " + new String(mqttMessage.getPayload()) + " 成功");
            }
            else {
                log.info("给主题: " + mqttTopic.getName() + " 发送消息: " + new String(mqttMessage.getPayload()) + " 失败");
            }
        }
        catch (MqttException e) {
            log.debug(e.toString());
            log.error("给主题: " + mqttTopic.getName() + " 发送消息: " + new String(mqttMessage.getPayload()) + " 错误");
        }
    }

    @Override
    public void publish(String mqttTopic, String mqttPayload, int qos) {  //推送消息(主题, 负载, QOS)
        this.publishClient = publishConnect();
        this.setMqttTopic(this.publishClient.getTopic(mqttTopic));  //设置主题
        this.setMqttMessage(new MqttMessage());  //设置消息
        this.getMqttMessage().setQos(qos);  //设置消息QOS等级
        this.getMqttMessage().setRetained(false);  //各有利弊, true会导致设备重连后消息重复
        this.getMqttMessage().setPayload(mqttPayload.getBytes());  //设置消息负载

        this.sendMessage(this.getMqttTopic(), this.getMqttMessage());
    }

    @Override
    public void subscribe(String topic, int qos) {  //订阅主题(主题, QOS)
        this.subscribeClient = subscribeConnect();
        try {
            subscribeClient.subscribe(topic, qos);
            log.debug("订阅主题: " + topic + " 成功");
        }
        catch (MqttException e) {
            log.debug(e.toString());
            log.error("订阅主题 " + topic + " 失败");
        }
    }

    @Override
    public void unsubscribe(String topic) {  //取消订阅主题
        this.subscribeClient = subscribeConnect();
        try {
            subscribeClient.unsubscribe(topic);
            log.debug("取消订阅主题 " + topic + " 成功");
        }
        catch (MqttException e) {
            log.debug(e.toString());
            log.error("取消订阅主题 " + topic + " 失败");
        }
    }
}


评论