一、引入依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>6.0.5</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.0.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>3.0.6</version>
</dependency>
二、构造配置类
@Configuration
@Data
public class MqttConfig {
@Value("${mqtt.host}")
private String Host; //emqx服务器的协议地址, 如tcp://localhost:1883
@Value("${mqtt.clientid}")
private String ClientId; //连接到emqx服务器时的clientId, 可自定义但不能重复
@Value("${mqtt.username}")
private String Username; //登录到emqx时的用户名
@Value("${mqtt.password}")
private String Password; //登录到emqx时的密码
@Value("${mqtt.topic}")
private String Topic; //连接到emqx时默认订阅的主题, 可缺省
@Value("${mqtt.timeout}")
private Integer Timeout; //超时时间
@Value("${mqtt.keepalive}")
private Integer Keepalive; //心跳时间
@Value("${mqtt.connectionTimeout}")
private Integer ConnectionTimeout; //连接超时时间
@Value("${mqtt.cleansession}")
private Boolean Cleansession; //与emqx服务器断开连接后是否保留session, 设置为false在重连时可获取掉线期间的消息(自动重新订阅需要额外配置)
}
三、建立连接
@Component
public class MqttConnectUtil {
MqttConfig mqttPublishConfig;
@Autowired
public MqttConnectUtil(MqttConfig mqttPublishConfig) {
this.mqttPublishConfig = mqttPublishConfig;
}
public MqttConnectOptions getOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(mqttPublishConfig.getCleansession());
mqttConnectOptions.setUserName(mqttPublishConfig.getUsername());
mqttConnectOptions.setPassword(mqttPublishConfig.getPassword().toCharArray());
mqttConnectOptions.setConnectionTimeout(mqttPublishConfig.getConnectionTimeout());
mqttConnectOptions.setKeepAliveInterval(mqttPublishConfig.getKeepalive());
return mqttConnectOptions;
}
public MqttConnectOptions getOptions(MqttConnectOptions mqttConnectOptions) {
mqttConnectOptions.setCleanSession(mqttPublishConfig.getCleansession());
mqttConnectOptions.setUserName(mqttPublishConfig.getUsername());
mqttConnectOptions.setPassword(mqttPublishConfig.getPassword().toCharArray());
mqttConnectOptions.setConnectionTimeout(mqttPublishConfig.getConnectionTimeout());
mqttConnectOptions.setKeepAliveInterval(mqttPublishConfig.getKeepalive());
return mqttConnectOptions;
}
}
四、定义回调函数
@Slf4j
@Service
public class CallbackServiceImpl implements MqttCallback {
MqttServiceImpl mqttService;
@Autowired
public CallbackServiceImpl(MqttServiceImpl mqttService) {
this.mqttService = mqttService;
}
@Override
public void connectionLost(Throwable throwable) { //当连接丢失时执行
log.warn("--- Connection Paused, Trying ReConnect ---");
mqttService.subscribeConnect(); //重新连接
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) { //当收到消息时执行
log.debug("收到消息主题: " + s + " Qos: " + mqttMessage.getQos() + " Payload: " + new String(mqttMessage.getPayload()) + " Id: " + mqttMessage.getId());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //当发出Qos等级不为0的消息时执行
log.debug("Qos回调状态: " + iMqttDeliveryToken.isComplete() + " " + Arrays.toString(iMqttDeliveryToken.getTopics()));
}
}
五、MqttService的构造
一般来说,需要构造以下六个方法:
public interface MqttService {
MqttClient publishConnect(); //建立推送连接
MqttClient subscribeConnect(); //建立订阅连接
void sendMessage(MqttTopic mqttTopic, MqttMessage mqttMessage); //发出消息
void publish(String mqttTopic, String mqttPayload, int qos); //调用sendMessage()发送消息
void subscribe(String topic, int qos); //订阅主题
void unsubscribe(String topic); //取消订阅主题
}
六、MqttService的实现
@Data
@Slf4j
@Service
public class MqttServiceImpl implements MqttService {
private MqttClient subscribeClient;
private MqttClient publishClient;
private MqttTopic mqttTopic;
private MqttMessage mqttMessage;
MqttConnectUtil mqttConnectUtil;
MqttConfig mqttConfig;
@Autowired
public MqttServiceImpl(MqttConnectUtil mqttConnectUtil, MqttConfig mqttConfig) {
this.mqttConnectUtil = mqttConnectUtil;
this.mqttConfig = mqttConfig;
}
@Override
public MqttClient publishConnect() {
try {
if (publishClient == null) { //推送连接为null时构造
publishClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientId() + "_publish", new MemoryPersistence());
}
MqttConnectOptions mqttConnectOptions = mqttConnectUtil.getOptions(); //读取配置
if (!publishClient.isConnected()) {
publishClient.connect(mqttConnectOptions);
log.debug("--- Publish Connect Success ---");
}
else { //失败时重连
publishClient.disconnect();
publishClient.connect(mqttConnectUtil.getOptions(mqttConnectOptions));
log.debug("--- Publish ReConnect Success ---");
}
}
catch (MqttException e) {
log.debug(e.toString());
log.error("--- Publish Connect Error ---");
}
return publishClient;
}
@Override
public MqttClient subscribeConnect() {
try {
if (subscribeClient == null) { //订阅连接为null时构造
subscribeClient = new MqttClient(mqttConfig.getHost(), mqttConfig.getClientId() + "_subscribe", new MemoryPersistence());
subscribeClient.setCallback(new CallbackServiceImpl(MqttServiceImpl.this)); //配置回调方法
}
MqttConnectOptions mqttConnectOptions = mqttConnectUtil.getOptions(); //读取配置
if (!subscribeClient.isConnected()) {
subscribeClient.connect(mqttConnectOptions);
log.debug("--- Subscribe Subscribe Success ---");
}
else { //失败时重连
subscribeClient.disconnect();
subscribeClient.connect(mqttConnectUtil.getOptions(mqttConnectOptions));
log.debug("--- Subscribe Subscribe Success ---");
}
}
catch (MqttException e) {
log.debug(e.toString());
log.error("--- Subscribe Connect Error ---");
}
return subscribeClient;
}
@Override
public void sendMessage(MqttTopic mqttTopic, MqttMessage mqttMessage) { //发出消息
try {
MqttDeliveryToken mqttDeliveryToken = mqttTopic.publish(mqttMessage); //推送消息, 返回一个MqttDeliveryToken状态, 便于了解发送状态
mqttDeliveryToken.waitForCompletion(); //等待发送完成
if (mqttDeliveryToken.isComplete()) {
log.debug("给主题: " + mqttTopic.getName() + " 发送消息: " + new String(mqttMessage.getPayload()) + " 成功");
}
else {
log.info("给主题: " + mqttTopic.getName() + " 发送消息: " + new String(mqttMessage.getPayload()) + " 失败");
}
}
catch (MqttException e) {
log.debug(e.toString());
log.error("给主题: " + mqttTopic.getName() + " 发送消息: " + new String(mqttMessage.getPayload()) + " 错误");
}
}
@Override
public void publish(String mqttTopic, String mqttPayload, int qos) { //推送消息(主题, 负载, QOS)
this.publishClient = publishConnect();
this.setMqttTopic(this.publishClient.getTopic(mqttTopic)); //设置主题
this.setMqttMessage(new MqttMessage()); //设置消息
this.getMqttMessage().setQos(qos); //设置消息QOS等级
this.getMqttMessage().setRetained(false); //各有利弊, true会导致设备重连后消息重复
this.getMqttMessage().setPayload(mqttPayload.getBytes()); //设置消息负载
this.sendMessage(this.getMqttTopic(), this.getMqttMessage());
}
@Override
public void subscribe(String topic, int qos) { //订阅主题(主题, QOS)
this.subscribeClient = subscribeConnect();
try {
subscribeClient.subscribe(topic, qos);
log.debug("订阅主题: " + topic + " 成功");
}
catch (MqttException e) {
log.debug(e.toString());
log.error("订阅主题 " + topic + " 失败");
}
}
@Override
public void unsubscribe(String topic) { //取消订阅主题
this.subscribeClient = subscribeConnect();
try {
subscribeClient.unsubscribe(topic);
log.debug("取消订阅主题 " + topic + " 成功");
}
catch (MqttException e) {
log.debug(e.toString());
log.error("取消订阅主题 " + topic + " 失败");
}
}
}