This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop_mqtt5.0
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/develop_mqtt5.0 by this push:
     new 5397e58  Support some simple feature (#254)
5397e58 is described below

commit 5397e58cb7e2831af52a5725b6c4bfd682da7a09
Author: Dongyuan Pan <dongyuanp...@gmail.com>
AuthorDate: Thu May 23 10:29:09 2024 +0800

    Support some simple feature (#254)
    
    * Support user properties
    
    * Supoort ContentType and payloadFormatIndicator
    
    * add CheckPacket
    
    * delete check packet
    
    * fix bug: reset topic alias
    
    * fix bug: reset topic alias
    
    * support SUBSCRIPTION_IDENTIFIER RESPONSE_TOPIC CORRELATION_DATA 
CONTENT_TYPE
---
 .../rocketmq/mqtt/common/util/MessageUtil.java     |   2 +-
 .../mqtt/common/test/util/TestMessageUtil.java     |  57 +++++++++++
 .../rocketmq/mqtt/cs/session/infly/PushAction.java | 104 ++++++++++++++++-----
 .../mqtt5/processor/PublishProcessor5.java         |   1 +
 .../rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java |   2 +-
 .../rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java |  15 +++
 6 files changed, 157 insertions(+), 24 deletions(-)

diff --git 
a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
 
b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
index 5362445..6b86468 100644
--- 
a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
+++ 
b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/MessageUtil.java
@@ -108,7 +108,7 @@ public class MessageUtil {
                 message.putUserProperty(Message.propertyCorrelationData, new 
String(correlationData.value(), StandardCharsets.UTF_8));
             }
 
-            // User Properties
+            // The user properties of publish packets need to be stored, and 
when pushing, they need to be brought with them
             List<MqttProperties.UserProperty> userProperties = 
(List<MqttProperties.UserProperty>) 
mqttProperties.getProperties(USER_PROPERTY.value());
             List<MqttProperties.StringPair> userPropertyList = new 
ArrayList<>();
             for (MqttProperties.UserProperty userProperty : userProperties) {
diff --git 
a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
 
b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
index 7318210..6788dbd 100644
--- 
a/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
+++ 
b/mqtt-common/src/test/java/org/apache/rocketmq/mqtt/common/test/util/TestMessageUtil.java
@@ -17,29 +17,50 @@
 
 package org.apache.rocketmq.mqtt.common.test.util;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttProperties;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.util.CharsetUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.mqtt.common.model.Message;
 import org.apache.rocketmq.mqtt.common.util.MessageUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.USER_PROPERTY;
 import static org.apache.rocketmq.mqtt.common.util.MessageUtil.EMPTYSTRING;
 import static 
org.apache.rocketmq.mqtt.common.util.MessageUtil.dealEmptyMessage;
 import static 
org.apache.rocketmq.mqtt.common.util.MessageUtil.removeRetainedFlag;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
 
 public class TestMessageUtil {
 
@@ -119,4 +140,40 @@ public class TestMessageUtil {
         newEmptyMessage.payload().readBytes(newBody);
         Assert.assertArrayEquals(EMPTYSTRING.getBytes(), newBody);
     }
+
+    @Test
+    public void TestMqtt5Message() {
+
+        MqttProperties props = new MqttProperties();
+        props.add(new MqttProperties.UserProperty("isSecret", "true"));
+        props.add(new MqttProperties.UserProperty("tag", "firstTag"));
+        props.add(new MqttProperties.UserProperty("tag", "secondTag"));
+
+        props.add(new 
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 100));
+        props.add(new 
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 101));
+
+        MqttFixedHeader mqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 
1);
+        MqttPublishVariableHeader variableHeader = new 
MqttPublishVariableHeader("test", 0, props);
+        ByteBuf payload = 
Unpooled.copiedBuffer("test".getBytes(StandardCharsets.UTF_8));
+        MqttPublishMessage publishMessage = new 
MqttPublishMessage(mqttFixedHeader, variableHeader, payload);
+        Message message = MessageUtil.toMessage(publishMessage);
+
+        String mqtt5UserProperties = 
message.getUserProperty(Message.propertyMqtt5UserProperty);
+        MqttPublishMessage newPublishMessage = null;
+
+        if (StringUtils.isNotBlank(mqtt5UserProperties)) {
+            ArrayList<MqttProperties.StringPair> userProperties = 
JSON.parseObject(mqtt5UserProperties,
+                    new TypeReference<ArrayList<MqttProperties.StringPair>>() 
{}
+            );
+            MqttProperties newProps = new MqttProperties();
+            newProps.add(new MqttProperties.UserProperties(userProperties));
+            MqttPublishVariableHeader newVariableHeader = new 
MqttPublishVariableHeader("test", 0, props);
+            newPublishMessage = new MqttPublishMessage(mqttFixedHeader, 
variableHeader, payload);
+        }
+
+        MqttProperties checkProps = 
newPublishMessage.variableHeader().properties();
+        Assert.assertEquals("true", 
((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(0).value()).value);
+        Assert.assertEquals("firstTag", 
((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(1).value()).value);
+        Assert.assertEquals("secondTag", 
((MqttProperties.StringPair)checkProps.getProperties(USER_PROPERTY.value()).get(2).value()).value);
+    }
 }
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
index ca044ac..feccc55 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java
@@ -17,10 +17,14 @@
 
 package org.apache.rocketmq.mqtt.cs.session.infly;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.handler.codec.mqtt.MqttEncoder;
 import io.netty.handler.codec.mqtt.MqttProperties;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttVersion;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
@@ -39,8 +43,13 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.List;
 
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC;
+import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
 import static java.lang.Math.min;
 import static java.util.Objects.hash;
@@ -95,9 +104,9 @@ public class PushAction {
         try {
             if (session.isClean()) {
                 if (message.getStoreTimestamp() > 0 &&
-                    message.getStoreTimestamp() < session.getStartTime()) {
+                        message.getStoreTimestamp() < session.getStartTime()) {
                     logger.warn("old msg:{},{},{},{}", session.getClientId(), 
message.getMsgId(),
-                        message.getStoreTimestamp(), session.getStartTime());
+                            message.getStoreTimestamp(), 
session.getStartTime());
                     rollNext(session, mqttId);
                     return;
                 }
@@ -172,28 +181,37 @@ public class PushAction {
                 data = MqttMessageFactory.buildPublishMessage(topicName, 
message.getPayload(), qos, retained, mqttId);
                 break;
             case MQTT_5:
+                // add content type
+                if 
(StringUtils.isNotBlank(message.getUserProperty(Message.propertyContentType))) {
+                    mqttProperties.add(new 
MqttProperties.StringProperty(CONTENT_TYPE.value(), 
message.getUserProperty(Message.propertyContentType)));
+                }
+
+                // add Response Topic
+                if 
(StringUtils.isNotBlank(message.getUserProperty(Message.propertyResponseTopic)))
 {
+                    mqttProperties.add(new 
MqttProperties.StringProperty(RESPONSE_TOPIC.value(), 
message.getUserProperty(Message.propertyResponseTopic)));
+                }
+
+                // add Correlation Data
+                if 
(StringUtils.isNotBlank(message.getUserProperty(Message.propertyCorrelationData)))
 {
+                    mqttProperties.add(new 
MqttProperties.StringProperty(CORRELATION_DATA.value(), 
message.getUserProperty(Message.propertyCorrelationData)));
+                }
+
+                // process publish user properties
+                processUserProperties(message, mqttProperties);
+
+                // process subscription identifier
+                if (subscription.getSubscriptionIdentifier() > 0) {
+                    mqttProperties.add(new 
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 
subscription.getSubscriptionIdentifier()));
+                }
+
                 // TODO retain flag should be set by subscription option
-                int topicAlias = ChannelInfo.getTopicAliasMaximum(channel);
-                if (topicAlias > 0) {
-                    String topicNameTmp = "";
-                    if (ChannelInfo.getServerTopicAlias(channel, topicName) == 
null) {
-                        // allocate topic alias
-                        int allocateAlias = genServerTopicAlias(topicName, 
topicAlias);
-
-                        if (ChannelInfo.getServerAliasTopic(channel, 
allocateAlias) != null) {
-                            // conflict, reset topic <-> alias
-                            topicNameTmp = topicName;
-                        }
-
-                        ChannelInfo.setServerTopicAlias(channel, topicName, 
allocateAlias);
-                        ChannelInfo.setServerAliasTopic(channel, 
allocateAlias, topicName);
-                    }
+                boolean isRetained = message.isRetained();
 
-                    mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), 
ChannelInfo.getServerTopicAlias(channel, topicName)));
-                    data = 
MqttMessageFactory.buildMqtt5PublishMessage(topicNameTmp, message.getPayload(), 
qos, retained, mqttId, mqttProperties);
+                // process topic alias
+                if (!processTopicAlias(channel, topicName, mqttProperties)) {
+                    data = MqttMessageFactory.buildMqtt5PublishMessage("", 
message.getPayload(), qos, isRetained, mqttId, mqttProperties);
                 } else {
-                    // no alias
-                    data = 
MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(), 
qos, retained, mqttId, mqttProperties);
+                    data = 
MqttMessageFactory.buildMqtt5PublishMessage(topicName, message.getPayload(), 
qos, isRetained, mqttId, mqttProperties);
                 }
                 break;
             default:
@@ -207,12 +225,54 @@ public class PushAction {
                 message.setRetry(message.getRetry() + 1);
                 logger.warn("retryPush:{},{},{}", session.getClientId(), 
message.getMsgId(), message.getRetry());
             } else if (subscription.isShare()) {
-                String lmqTopic = MixAll.LMQ_PREFIX + 
StringUtils.replace(message.getOriginTopic(), "/","%");
+                String lmqTopic = MixAll.LMQ_PREFIX + 
StringUtils.replace(message.getOriginTopic(), "/", "%");
                 lmqQueueStore.popAck(lmqTopic, subscription.getSharedName(), 
message);
             }
         });
     }
 
+    public void processUserProperties(Message message, MqttProperties 
mqttProperties) {
+        String mqtt5UserProperties = 
message.getUserProperty(Message.propertyMqtt5UserProperty);
+        if (StringUtils.isNotBlank(mqtt5UserProperties)) {
+            ArrayList<MqttProperties.StringPair> userProperties = 
JSON.parseObject(mqtt5UserProperties,
+                    new TypeReference<ArrayList<MqttProperties.StringPair>>() {
+                    }
+            );
+            mqttProperties.add(new 
MqttProperties.UserProperties(userProperties));
+        }
+    }
+
+    /**
+     * process topic alias
+     * @param channel
+     * @param topicName
+     * @param mqttProperties
+     * @return true: conflict when allocated topic alias
+     */
+    public boolean processTopicAlias(Channel channel, String topicName, 
MqttProperties mqttProperties) {
+        int topicAlias = ChannelInfo.getTopicAliasMaximum(channel);
+        boolean conflict = false;
+
+        if (topicAlias > 0) {
+            if (ChannelInfo.getServerTopicAlias(channel, topicName) == null) {
+                // allocate topic alias
+                int allocateAlias = genServerTopicAlias(topicName, topicAlias);
+
+                if (ChannelInfo.getServerAliasTopic(channel, allocateAlias) != 
null) {
+                    // conflict, client will reset topic <-> alias
+                    conflict = true;
+                }
+
+                ChannelInfo.setServerTopicAlias(channel, topicName, 
allocateAlias);
+                ChannelInfo.setServerAliasTopic(channel, allocateAlias, 
topicName);
+            }
+
+            // topic has allocated topic alias,just set to mqttProperties
+            mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), 
ChannelInfo.getServerTopicAlias(channel, topicName)));
+        }
+        return conflict;
+    }
+
     public int genServerTopicAlias(String topicName, int topicAliasMaximum) {
         return hash(topicName) % topicAliasMaximum + 1;
     }
diff --git 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
index 2cdee66..4c3bb89 100644
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt5/processor/PublishProcessor5.java
@@ -72,6 +72,7 @@ public class PublishProcessor5 implements UpstreamProcessor5 {
     public CompletableFuture<StoreResult> put(MqttMessageUpContext context, 
MqttMessage mqttMessage) {
         MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) 
mqttMessage;
 
+        // process topic alias
         final MqttPublishVariableHeader variableHeaderTmp = 
mqttPublishMessage.variableHeader();
         MqttProperties mqttProperties = variableHeaderTmp.properties();
         if (mqttProperties != null && context.getClientTopicAliasMap() != 
null) {
diff --git 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
index f40dd94..c139d40 100644
--- 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
+++ 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Consumer.java
@@ -62,7 +62,7 @@ public class Mqtt5Consumer {
                 try {
                     String payload = new String(message.getPayload());
                     String[] ss = payload.split("_");
-                    System.out.println(now() + "receive:" + topic + "," + 
payload);
+                    System.out.println(now() + "receive:" + topic + "," + 
payload + ", properties: " + message.getProperties());
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
diff --git 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
index 4968f9e..4ae1876 100644
--- 
a/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
+++ 
b/mqtt-example/src/main/java/org/apache/rocketmq/mqtt/example/mqtt5/Mqtt5Producer.java
@@ -27,12 +27,15 @@ import 
org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
 import org.eclipse.paho.mqttv5.common.MqttException;
 import org.eclipse.paho.mqttv5.common.MqttMessage;
 import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.eclipse.paho.mqttv5.common.packet.UserProperty;
 
 import java.nio.charset.StandardCharsets;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 public class Mqtt5Producer {
     public static void main(String[] args) throws InterruptedException, 
MqttException, NoSuchAlgorithmException, InvalidKeyException {
@@ -82,6 +85,18 @@ public class Mqtt5Producer {
         for (int i = 0; i < 1000; i++) {
             String msg = "r1_" + System.currentTimeMillis() + "_" + i;
             MqttMessage message = new 
MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
+
+            // user properties
+            MqttProperties mqttProperties = new MqttProperties();
+            List<UserProperty> userProperties = new ArrayList<>();
+            userProperties.add(new UserProperty("tag", "r1"));
+            userProperties.add(new UserProperty("tag", "r11"));
+            mqttProperties.setUserProperties(userProperties);
+
+            // content type
+            mqttProperties.setContentType("text/plain");
+            message.setProperties(mqttProperties);
+
             message.setQos(1);
             String mqttSendTopic = firstTopic + "/r1";
             mqttClient.publish(mqttSendTopic, message);

Reply via email to