Mqtt Usage

2025-05-16
5 min read

MQTT is a standard messaging protocol for the Internet of Things (IoT).MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

Introduction

MQTT is a standard messaging protocol for the Internet of Things (IoT).MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

MQTT Publish / Subscribe Architecture
img

Mqtt broker/server

MQTT brokers receive published messages and dispatch the message to the subscribing MQTT clients. An MQTT message contains a message topic that MQTT clients subscribe to and MQTT brokers use these subscription lists for determining the MQTT clients to receive the message.

There have many mqtt broker implements, such as emqx hivemq.We choose emqx,cause EMQX said officially it is the world’s most scalable and reliable MQTT platform, designed for high-performance, reliable, and secure IoT data infrastructure.Using it really does work like that.You could find it here .

MQTT Wildcards

When a client subscribes to a topic with a multi-level wildcard, it receives all messages of a topic that begins with the pattern before the wildcard character, regardless of the length or depth of the topic. If the topic is specified as “#” alone, the client receives all messages sent to the MQTT broker.

MQTT Wildcard – Single Level: +
The single-level wildcard is represented by the plus symbol (+) and allows the replacement of a single topic level. By subscribing to a topic with a single-level wildcard, any topic that contains an arbitrary string in place of the wildcard will be matched.

img

MQTT Wildcard – Multi Level:
The multi-level wildcard covers multiple topic levels. It is represented by the hash symbol (#) and must be placed as the last character in the topic, preceded by a forward slash.
img

Mqtt Qos(Quality of Service)

MQTT implements 3 quality of service levels for agreement between the sender and receiver:

  1. At most once (0)
  2. At least once (1)
  3. Exactly once (2)

These QoS levels allow for more reliable IoT applications since the underlying messaging infrastructure and adapt to unreliable network conditions.You could get the details from here.

int qos = 1;
client.subscribe("$share/group/HA/state", qos);

Mqtt Shared Subscriptions

Shared subscriptions, a core feature of MQTT 5, enable multiple MQTT clients to share a single subscription on the broker. In essence, this feature allows messages on a topic to be distributed among multiple clients, thereby improving load balancing and fault tolerance in an MQTT system.If you are building a distribute system,this feature is indispensable.

img
Use Mqtt shared subscriptions, you just neet add $share/group/ in front of topic name in the subscribe function.And then your distributed applications will receive messages alternately.

client.subscribe("$share/group/HA/state", 1);

Mqtt Retain meassage

MQTT clients that subscribe to a new topic have no insight into when to expect the first message they will receive. However, an MQTT broker can store a retained message that can be sent immediately upon a new MQTT subscription. In this case, the MQTT client will receive at least one message upon subscribing to the topic.

This is important when reload backend application, when reboot the mqtt client more or less there will missing some message.This will lead to data inconsistency issues, retain message could make your application get the latest state of iots, to make the data consistency immediately.

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    log.info("\n\t{}[MQTT.messageArrived]{}\n\t[Retained: {}]\n\t[{}]",
            AnsiColor.GREEN, AnsiColor.RESET,
            message.isRetained(), topic,
            new String(message.getPayload()));
}

Mqtt shared subscriptions could not worked with retain message

Mqtt Client for Java

Eclipse Paho Java Client is a good choice cause it was stable enough.Note that there have 2 big versions v3 and v5.To ensure you could ues the mqtt v5 latest features,choice the version of v5.Cause https://mvnrepository.com/search?q=Eclipse+Paho+Java+Client put v3 in the top of the list,it’s easily to make a mistake to get the oldder one.

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>

Compatibility messages

It was found that retain and share message were not compatibility through testing.It means when using share subscription application will not receive the retain message when starting up.
To solve this you could use two mqtt clients, one for share message other for retian message.When there have no retain message we could disconnect mqtt client to release system resource.

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    if (!message.isRetained()) return;
    log.info("==== [HA_CLOUD_STATE_RETAINED_TOPIC.retained:{}]: {},{}", message.isRetained(), topic, new String(message.getPayload()));
    mqttConfig.handleMesage(topic, message);
    LAST_RETAIN_TIME = System.currentTimeMillis();
}

// Spring 定时任务:每 60 秒检查一次是否已超时
@Scheduled(fixedRate = 60000)
public void checkRetainTimeout() {
    if (client == null || !client.isConnected()) {
        return;
    }
    if (System.currentTimeMillis() - LAST_RETAIN_TIME > RETAIN_TIMEOUT_MS) {
        try {
            log.info("==== [mqtt.retianed]: 超时未收到新 retained 消息,取消订阅!");
            client.unsubscribe(MqttTopicDefine.HA_CLOUD_STATE_RETAIN_TOPIC);
            client.disconnect();
            client.close();
        } catch (MqttException e) {
            log.error("==== [error]: {}", e);
        }
    }
}

Complete code

package org.jeecg.config;

import java.net.InetAddress;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MqttDefaultFilePersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.jeecg.common.base.BaseMap;
import org.jeecg.modules.iot.enhance.MqttTopicDefine;
import org.jeecg.modules.iot.sys.AnsiColor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Configuration
public class MqttConfig {

    @Value("${mqtt.broker}")
    private String broker;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttClient mqttClient() throws Exception {
        String clientId = InetAddress.getLocalHost().getHostName();
        MqttClient client = new MqttClient(broker, clientId, new MqttDefaultFilePersistence());
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setCleanStart(true);
        options.setSessionExpiryInterval(0L);
        options.setAutomaticReconnect(true);
        options.setUserName(username);
        options.setPassword(password.getBytes());
        client.setCallback(new MqttCallback() {
            @Override
            public void disconnected(MqttDisconnectResponse disconnectResponse) {
                log.error("MQTT连接丢失,{}", disconnectResponse.getException().getMessage());
            }

            @Override
            public void mqttErrorOccurred(MqttException exception) {
                log.error("==== [mqttErrorOccurred]: {}", exception);
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                if (message.isRetained()) return;
                log.info("\n\t{}[MQTT.messageArrived]{}\n\t[Retained: {}]\n\t[{}]",
                        AnsiColor.GREEN, AnsiColor.RESET,
                        message.isRetained(), topic,
                        new String(message.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttToken token) {
                // log.info("==== [消息投递完成]");
            }

            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                log.info("==== [connectComplete]");
                try {
                    client.subscribe(MqttTopicDefine.HA_CLOUD_HEART_TOPIC, 1);
                    client.subscribe(MqttTopicDefine.HA_CLOUD_STATE_TOPIC, 1);
                } catch (MqttException e) {
                    log.error("==== [error]: {}", e);
                }
            }

            @Override
            public void authPacketArrived(int reasonCode, MqttProperties properties) {

            }
        });
        client.connect(options);
        return client;
    }

}