This is an automated email from the ASF dual-hosted git repository.

dongyuanpan pushed a commit to branch develop_mqtt5.0
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git

commit 4d2e294a2dab71bce84e375163f8400c9a378865
Merge: b210bd5 3cd226c
Author: dongyuan.pdy <dongyuanp...@gmail.com>
AuthorDate: Thu Jun 27 09:52:43 2024 +0800

    merge develop

 distribution/conf/spring.xml                       |   2 +-
 docs/quic.md                                       |  16 ++
 mqtt-common/pom.xml                                |   5 +
 .../rocketmq/mqtt/common/facade/LmqQueueStore.java |   9 ++
 .../mqtt/common/hook/AbstractUpstreamHook.java     |   5 +-
 .../rocketmq/mqtt/common/hook/EventHook.java       |  45 ++++++
 .../mqtt/common/hook/EventHookManager.java         |  42 +++++
 .../rocketmq/mqtt/common/model/ClientEvent.java    | 123 ++++++++++++++
 .../rocketmq/mqtt/common/model/Constants.java      |   6 +
 .../rocketmq/mqtt/common/model/EventType.java      |  23 +++
 .../rocketmq/mqtt/common/util/NetworkHelper.java   |  35 ++++
 .../mqtt/common/test/model/TestClientEvent.java    |  57 +++++++
 mqtt-cs/pom.xml                                    |  11 +-
 .../mqtt/cs/channel/AdaptiveTlsHandler.java        |  93 +++++++++++
 .../mqtt/cs/channel/DefaultChannelManager.java     |  33 +++-
 .../rocketmq/mqtt/cs/config/ConnectConf.java       |  32 ++++
 .../mqtt/cs/hook/EventHookManagerImpl.java         | 171 ++++++++++++++++++++
 .../protocol/mqtt/handler/MqttConnectHandler.java  |   8 +
 .../rocketmq/mqtt/cs/protocol/ssl/SslFactory.java  |  30 +++-
 .../rocketmq/mqtt/cs/session/loop/QueueCache.java  |  35 ++--
 .../mqtt/cs/session/loop/SessionLoopImpl.java      |   3 +-
 .../rocketmq/mqtt/cs/starter/MqttServer.java       | 177 +++++++++++++++++----
 .../cs/test/hook/TestEventHookManagerImpl.java     | 135 ++++++++++++++++
 .../cs/test/hook/TestUpstreamHookManagerImpl.java  |  28 ++--
 .../mqtt/handler/TestMqttConnectHandler.java       |   9 ++
 .../cs/test/session/infly/TestRetryDriver.java     |  30 ++--
 .../rocketmq/mqtt/ds/auth/AuthManagerSample.java   |   9 ++
 .../ds/event/processor/ClientEventProcessor.java   | 107 +++++++++++++
 .../rocketmq/mqtt/ds/notify/NotifyManager.java     |  15 +-
 .../mqtt/ds/store/LmqQueueStoreManager.java        |  65 ++++++++
 .../ds/upstream/mqtt/UpstreamProcessorManager.java |  10 +-
 .../upstream/mqtt/processor/PublishProcessor.java  |  12 +-
 .../event/processor/TestClientEventProcessor.java  |  97 +++++++++++
 .../ds/test/meta/TestMetaPersistManagerSample.java |  10 +-
 .../mqtt/ds/test/notify/TestNotifyManager.java     |   2 +-
 .../ds/test/store/TestLmqQueueStoreManager.java    |  17 ++
 .../mqtt/processor/TestPublishProcessor.java       |  29 ++--
 .../mqtt5/processor/TestPublishProcessor.java      |   3 +-
 .../mqtt/example/MqttClientEventConsumer.java      | 144 +++++++++++++++++
 .../exporter/collector/MqttMetricsCollector.java   |  24 +++
 .../mqtt/exporter/collector/MqttMetricsInfo.java   |  15 +-
 pom.xml                                            |  22 ++-
 42 files changed, 1624 insertions(+), 120 deletions(-)

diff --cc 
mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
index 6871255,ef65edb..e83366d
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java
@@@ -22,8 -21,9 +22,10 @@@ import io.netty.handler.codec.mqtt.Mqtt
  import io.netty.util.HashedWheelTimer;
  import io.netty.util.Timeout;
  import org.apache.commons.lang3.StringUtils;
+ import org.apache.rocketmq.mqtt.common.hook.EventHookManager;
+ import org.apache.rocketmq.mqtt.common.model.EventType;
  import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 +import org.apache.rocketmq.mqtt.cs.protocol.mqtt.facotry.MqttMessageFactory;
  import org.apache.rocketmq.mqtt.cs.session.Session;
  import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId;
  import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver;
@@@ -118,31 -124,6 +126,31 @@@ public class DefaultChannelManager impl
  
      @Override
      public void closeConnect(Channel channel, ChannelCloseFrom from, String 
reason) {
-         unloadResource(channel, reason);
++        unloadResource(channel, reason, from);
 +
 +        if (channel.isActive()) {
 +            channel.close();
 +        }
 +        logger.info("Close Connect of channel {} from {} by reason of {}", 
channel, from, reason);
 +    }
 +
 +    @Override
 +    public void closeConnect(Channel channel, ChannelCloseFrom from, String 
reason, byte reasonCode) {
-         unloadResource(channel, reason);
++        unloadResource(channel, reason, from);
 +
 +        if (channel.isActive()) {
 +            
channel.writeAndFlush(MqttMessageFactory.buildMqtt5DisconnectMessage(reasonCode,
 reason));
 +            channel.close();
 +        }
 +        logger.info("Close Connect of channel {} from {} by reason of {}", 
channel, from, reason);
 +    }
 +
 +    @Override
 +    public void closeConnectWithProtocolError(Channel channel, String reason) 
{
 +        closeConnect(channel, ChannelCloseFrom.SERVER, reason, 
MqttReasonCodes.Disconnect.PROTOCOL_ERROR.byteValue());
 +    }
 +
-     public void unloadResource(Channel channel, String reason) {
++    public void unloadResource(Channel channel, String reason, 
ChannelCloseFrom from) {
          String clientId = ChannelInfo.getClientId(channel);
          String channelId = ChannelInfo.getId(channel);
          willLoop.closeConnect(channel, clientId, reason);
diff --cc 
mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
index 82b1643,dcab351..4744eba
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
@@@ -57,11 -72,7 +72,12 @@@ public class ConnectConf 
      private boolean enablePrometheus = false;
      private boolean exportJvmInfo = true;
  
 +    private boolean enableSharedSubscription = false;
 +    private boolean enableRetain = false;
 +    private boolean enableSubscriptionIdentifier = false;
 +    private int topicAliasMaximum = 10;
 +    private int serverReceiveMaximum = 65535;
+     private int maxTransferCountOnMessageInDisk = 8;
  
      public ConnectConf() throws IOException {
          ClassPathResource classPathResource = new 
ClassPathResource(CONF_FILE_NAME);
@@@ -273,43 -284,19 +289,59 @@@
          this.sslServerKeyPassword = sslServerKeyPassword;
      }
  
 +    public boolean isEnableSharedSubscription() {
 +        return enableSharedSubscription;
 +    }
 +
 +    public void setEnableSharedSubscription(boolean enableSharedSubscription) 
{
 +        this.enableSharedSubscription = enableSharedSubscription;
 +    }
 +
 +    public boolean isEnableRetain() {
 +        return enableRetain;
 +    }
 +
 +    public void setEnableRetain(boolean enableRetain) {
 +        this.enableRetain = enableRetain;
 +    }
 +
 +    public boolean isEnableSubscriptionIdentifier() {
 +        return enableSubscriptionIdentifier;
 +    }
 +
 +    public void setEnableSubscriptionIdentifier(boolean 
enableSubscriptionIdentifier) {
 +        this.enableSubscriptionIdentifier = enableSubscriptionIdentifier;
 +    }
 +
 +    public int getTopicAliasMaximum() {
 +        return topicAliasMaximum;
 +    }
 +
 +    public void setTopicAliasMaximum(int topicAliasMaximum) {
 +        this.topicAliasMaximum = topicAliasMaximum;
 +    }
 +
 +    public int getServerReceiveMaximum() {
 +        return serverReceiveMaximum;
 +    }
 +
 +    public void setServerReceiveMaximum(int serverReceiveMaximum) {
 +        this.serverReceiveMaximum = serverReceiveMaximum;
 +    }
++
+     public int getMaxTransferCountOnMessageInDisk() {
+         return maxTransferCountOnMessageInDisk;
+     }
+ 
+     public void setMaxTransferCountOnMessageInDisk(int 
maxTransferCountOnMessageInDisk) {
+         this.maxTransferCountOnMessageInDisk = 
maxTransferCountOnMessageInDisk;
+     }
+ 
+     public int getQuicPort() {
+         return quicPort;
+     }
+ 
+     public void setQuicPort(int quicPort) {
+         this.quicPort = quicPort;
+     }
  }
diff --cc 
mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java
index 90bee39,5bf16ad..8b6af87
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java
@@@ -30,16 -35,18 +35,23 @@@ import io.netty.handler.codec.http.Http
  import io.netty.handler.codec.http.HttpServerCodec;
  import io.netty.handler.codec.mqtt.MqttDecoder;
  import io.netty.handler.codec.mqtt.MqttEncoder;
 +import io.netty.handler.codec.mqtt.MqttVersion;
  import io.netty.handler.ssl.SslHandler;
  import io.netty.handler.stream.ChunkedWriteHandler;
 +import org.apache.rocketmq.mqtt.cs.channel.ChannelManager;
+ import javax.annotation.PreDestroy;
+ import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
+ import io.netty.incubator.codec.quic.QuicChannel;
+ import io.netty.incubator.codec.quic.QuicServerCodecBuilder;
+ import io.netty.incubator.codec.quic.QuicStreamChannel;
+ import java.util.concurrent.TimeUnit;
  import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
+ import org.apache.rocketmq.mqtt.cs.channel.AdaptiveTlsHandler;
  import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 +import org.apache.rocketmq.mqtt.cs.protocol.ChannelPipelineLazyInit;
 +import org.apache.rocketmq.mqtt.cs.protocol.MqttVersionHandler;
  import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketDispatcher;
 +import org.apache.rocketmq.mqtt.cs.protocol.mqtt5.Mqtt5PacketDispatcher;
  import org.apache.rocketmq.mqtt.cs.protocol.ssl.SslFactory;
  import org.apache.rocketmq.mqtt.cs.protocol.ws.WebSocketServerHandler;
  import org.apache.rocketmq.mqtt.cs.protocol.ws.WebSocketEncoder;
@@@ -77,8 -84,11 +92,14 @@@ public class MqttServer 
      @Resource
      private SslFactory sslFactory;
  
++
 +    @Resource
 +    private ChannelManager channelManager;
+     private NioEventLoopGroup acceptorEventLoopGroup;
+ 
+     private NioEventLoopGroup workerEventLoopGroup;
+ 
+     private AdaptiveTlsHandler adaptiveTlsHandler;
  
      @PostConstruct
      public void init() throws Exception {
@@@ -147,9 -161,29 +172,38 @@@
                  }
              });
          tlsServerBootstrap.bind();
-         logger.warn("start mqtt tls server , port:{}", tlsPort);
+         LOGGER.info("MQTT server for TCP over TLS started, listening: {}", 
tlsPort);
+     }
+ 
+     private void assembleHandlerPipeline(ChannelPipeline pipeline) {
+         pipeline.addLast("connectHandler", connectHandler);
 -        pipeline.addLast("decoder", new 
MqttDecoder(connectConf.getMaxPacketSizeInByte()));
 -        pipeline.addLast("encoder", MqttEncoder.INSTANCE);
 -        pipeline.addLast("dispatcher", mqttPacketDispatcher);
++        pipeline.addLast("versionHandler", new 
MqttVersionHandler(channelManager, new ChannelPipelineLazyInit() {
++            @Override
++            public void init(ChannelPipeline channelPipeline, MqttVersion 
mqttVersion) {
++                channelPipeline.addLast("decoder", new 
MqttDecoder(connectConf.getMaxPacketSizeInByte()));
++                channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
++                if (MqttVersion.MQTT_5.equals(mqttVersion)) {
++                    channelPipeline.addLast("mqtt5PacketDispatcher", 
mqtt5PacketDispatcher);
++                } else {
++                    channelPipeline.addLast("dispatcher", 
mqttPacketDispatcher);
++                }
++            }
++        }));
      }
  
+     /**
+      * Support Web Socket Transport.
+      * </p>
+      *
+      * Both WebSocket and WebSocket-Over-TLS/SSL are supported.
+      * </p>
+      *
+      * Clients are supposed to use one of the following Server URIs to 
connect:
+      * <ul>
+      *     <li>ws://host:port/mqtt</li>
+      *     <li>wss://host:port/mqtt</li>
+      * </ul>
+      */
      private void startWs() {
          int port = connectConf.getMqttWsPort();
          wsServerBootstrap
diff --cc 
mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java
index d160884,bdba0cb..d00fc35
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java
@@@ -89,9 -90,7 +90,10 @@@ public class AuthManagerSample extends 
                  logger.error("", e);
              }
              if (!Objects.equals(username, serviceConf.getUsername()) || 
!validateSign) {
 +                if (mqttConnectMessage.variableHeader().version() == 5) {
 +                    return new HookResult(HookResult.FAIL, 
MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD.byteValue(), 
Remark.AUTH_FAILED, null);
 +                }
+                 collectAuthFailedTps();
                  return new HookResult(HookResult.FAIL, 
MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.byteValue(), 
Remark.AUTH_FAILED, null);
              }
          }
diff --cc 
mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
index 5c1f71c,a2930f4..f53b6fe
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java
@@@ -238,9 -281,16 +281,17 @@@ public class LmqQueueStoreManager imple
          }
      }
  
+     private void collectEventReadWriteRt(String action, long rt, boolean 
status) {
+         try {
+             MqttMetricsCollector.collectEventReadWriteRt(rt, action, 
String.valueOf(status));
+         } catch (PrometheusException e) {
+             logger.error("", e);
+         }
+     }
+ 
      @Override
 -    public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue 
queue, QueueOffset queueOffset, long count) {
 +    public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue 
queue, QueueOffset queueOffset,
 +                                                     long count) {
          CompletableFuture<PullResult> result = new CompletableFuture<>();
          try {
              MessageQueue messageQueue = new MessageQueue(firstTopic, 
queue.getBrokerName(), (int) queue.getQueueId());
diff --cc 
mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/UpstreamProcessorManager.java
index b822c7d,fadf0fd..3a0e8e6
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/UpstreamProcessorManager.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/UpstreamProcessorManager.java
@@@ -25,11 -25,12 +25,12 @@@ import org.apache.rocketmq.mqtt.common.
  import org.apache.rocketmq.mqtt.common.hook.UpstreamHookEnum;
  import org.apache.rocketmq.mqtt.common.hook.UpstreamHookManager;
  import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
 -import org.apache.rocketmq.mqtt.ds.upstream.processor.ConnectProcessor;
 -import org.apache.rocketmq.mqtt.ds.upstream.processor.DisconnectProcessor;
 -import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor;
 -import org.apache.rocketmq.mqtt.ds.upstream.processor.SubscribeProcessor;
 -import org.apache.rocketmq.mqtt.ds.upstream.processor.UnSubscribeProcessor;
 +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.ConnectProcessor;
 +import 
org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.DisconnectProcessor;
 +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.PublishProcessor;
 +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.SubscribeProcessor;
 +import 
org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.UnSubscribeProcessor;
+ import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
  import org.springframework.stereotype.Component;
  
  import javax.annotation.PostConstruct;
@@@ -66,13 -67,7 +67,13 @@@ public class UpstreamProcessorManager e
  
      @Override
      public CompletableFuture<HookResult> 
processMqttMessage(MqttMessageUpContext context, MqttMessage message) {
- 
 +        CompletableFuture<HookResult> hookResult = new CompletableFuture<>();
 +        if (context.getMqttVersion() != null && 
MqttVersion.MQTT_5.equals(context.getMqttVersion())) {
 +            hookResult.complete(new HookResult(HookResult.SUCCESS, null, 
null));
 +            return hookResult;
 +        }
 +
+         collectProcessRequestTps(message.fixedHeader().messageType().name());
          switch (message.fixedHeader().messageType()) {
              case CONNECT:
                  return connectProcessor.process(context, message);
diff --cc 
mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/processor/PublishProcessor.java
index 66e3c64,01b4511..ae7b9df
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/processor/PublishProcessor.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/processor/PublishProcessor.java
@@@ -35,7 -35,8 +35,8 @@@ import org.apache.rocketmq.mqtt.common.
  import org.apache.rocketmq.mqtt.common.util.TopicUtils;
  import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
  import org.apache.rocketmq.mqtt.ds.meta.WildcardManager;
 -import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor;
 +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.UpstreamProcessor;
+ import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.springframework.stereotype.Component;
diff --cc 
mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt/processor/TestPublishProcessor.java
index 82652a7,ac5b07c..c159c52
--- 
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt/processor/TestPublishProcessor.java
+++ 
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt/processor/TestPublishProcessor.java
@@@ -1,23 -1,21 +1,21 @@@
  /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
   *
-  *  * Licensed to the Apache Software Foundation (ASF) under one or more
-  *  * contributor license agreements.  See the NOTICE file distributed with
-  *  * this work for additional information regarding copyright ownership.
-  *  * The ASF licenses this file to You under the Apache License, Version 2.0
-  *  * (the "License"); you may not use this file except in compliance with
-  *  * the License.  You may obtain a copy of the License at
-  *  *
-  *  *     http://www.apache.org/licenses/LICENSE-2.0
-  *  *
-  *  * Unless required by applicable law or agreed to in writing, software
-  *  * distributed under the License is distributed on an "AS IS" BASIS,
-  *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  *  * See the License for the specific language governing permissions and
-  *  * limitations under the License.
+  *     http://www.apache.org/licenses/LICENSE-2.0
   *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
   */
  
 -package org.apache.rocketmq.mqtt.ds.test.upstream.processor;
 +package org.apache.rocketmq.mqtt.ds.test.upstream.mqtt.processor;
  
  import io.netty.buffer.ByteBuf;
  import io.netty.buffer.Unpooled;
diff --cc 
mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt5/processor/TestPublishProcessor.java
index f77efd7,0000000..9a5651f
mode 100644,000000..100644
--- 
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt5/processor/TestPublishProcessor.java
+++ 
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt5/processor/TestPublishProcessor.java
@@@ -1,151 -1,0 +1,152 @@@
 +/*
 + *
 + *  * Licensed to the Apache Software Foundation (ASF) under one or more
 + *  * contributor license agreements.  See the NOTICE file distributed with
 + *  * this work for additional information regarding copyright ownership.
 + *  * The ASF licenses this file to You under the Apache License, Version 2.0
 + *  * (the "License"); you may not use this file except in compliance with
 + *  * the License.  You may obtain a copy of the License at
 + *  *
 + *  *     http://www.apache.org/licenses/LICENSE-2.0
 + *  *
 + *  * Unless required by applicable law or agreed to in writing, software
 + *  * distributed under the License is distributed on an "AS IS" BASIS,
 + *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  * See the License for the specific language governing permissions and
 + *  * limitations under the License.
 + *
 + */
 +
 +package org.apache.rocketmq.mqtt.ds.test.upstream.mqtt5.processor;
 +
 +import io.netty.buffer.ByteBuf;
 +import io.netty.buffer.Unpooled;
 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
 +import io.netty.handler.codec.mqtt.MqttMessageType;
 +import io.netty.handler.codec.mqtt.MqttProperties;
 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
 +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 +import io.netty.handler.codec.mqtt.MqttQoS;
 +import org.apache.commons.lang3.reflect.FieldUtils;
 +import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore;
 +import org.apache.rocketmq.mqtt.common.hook.HookResult;
++import org.apache.rocketmq.mqtt.common.model.Message;
 +import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext;
 +import org.apache.rocketmq.mqtt.common.model.StoreResult;
 +import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
 +import org.apache.rocketmq.mqtt.ds.meta.WildcardManager;
 +import org.apache.rocketmq.mqtt.ds.upstream.mqtt5.processor.PublishProcessor5;
 +import org.apache.rocketmq.remoting.exception.RemotingException;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.mockito.Mock;
 +import org.mockito.junit.MockitoJUnitRunner;
 +
 +import java.nio.charset.StandardCharsets;
 +import java.util.Collections;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.CompletableFuture;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ExecutionException;
 +
 +import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
 +import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
 +import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR;
 +import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL;
 +import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC;
 +import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
 +import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS;
 +import static org.mockito.ArgumentMatchers.any;
 +import static org.mockito.ArgumentMatchers.anySet;
 +import static org.mockito.ArgumentMatchers.anyString;
 +import static org.mockito.Mockito.verify;
 +import static org.mockito.Mockito.when;
 +
 +@RunWith(MockitoJUnitRunner.class)
 +public class TestPublishProcessor {
 +
 +    @Mock
 +    private LmqQueueStore lmqQueueStore;
 +
 +    @Mock
 +    private WildcardManager wildcardManager;
 +
 +    @Mock
 +    private FirstTopicManager firstTopicManager;
 +
 +    private MqttPublishMessage publishMessage;
 +
 +    private PublishProcessor5 publishProcessor5;
 +
 +    @Before
 +    public void init() throws IllegalAccessException {
 +        publishProcessor5 = new PublishProcessor5();
 +        FieldUtils.writeDeclaredField(publishProcessor5, "lmqQueueStore", 
lmqQueueStore, true);
 +        FieldUtils.writeDeclaredField(publishProcessor5, "wildcardManager", 
wildcardManager, true);
 +        FieldUtils.writeDeclaredField(publishProcessor5, "firstTopicManager", 
firstTopicManager, true);
 +
 +        MqttProperties props = new MqttProperties();
 +        props.add(new 
MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
 +        props.add(new 
MqttProperties.IntegerProperty(PUBLICATION_EXPIRY_INTERVAL.value(), 10));
 +        props.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), 1));
 +        props.add(new MqttProperties.StringProperty(RESPONSE_TOPIC.value(), 
"Response Topic"));
 +        props.add(new MqttProperties.BinaryProperty(CORRELATION_DATA.value(), 
"Correlation Data".getBytes(StandardCharsets.UTF_8)));
 +        props.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), 
"Content Type"));
 +
 +        props.add(new MqttProperties.UserProperty("isSecret", "true"));
 +        props.add(new MqttProperties.UserProperty("tag", "firstTag"));
 +        props.add(new MqttProperties.UserProperty("tag", "secondTag"));
 +
 +        props.add(new 
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 100));
 +        props.add(new 
MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 101));
 +        props.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), 
10));
 +
 +        MqttFixedHeader mqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 
1);
 +        MqttPublishVariableHeader variableHeader = new 
MqttPublishVariableHeader("test", 0, props);
 +        ByteBuf payload = 
Unpooled.copiedBuffer("test".getBytes(StandardCharsets.UTF_8));
 +        publishMessage = new MqttPublishMessage(mqttFixedHeader, 
variableHeader, payload);
 +
 +        Set<String> queues = Collections.singleton("testQueue");
 +        when(wildcardManager.matchQueueSetByMsgTopic(anyString(), 
anyString())).thenReturn(queues);
 +        CompletableFuture<StoreResult> storeResultFuture = new 
CompletableFuture<>();
 +        StoreResult storeResult = new StoreResult();
 +        storeResultFuture.complete(storeResult);
-         when(lmqQueueStore.putMessage(anySet(), 
any())).thenReturn(storeResultFuture);
++        when(lmqQueueStore.putMessage(anySet(), (Message) 
any())).thenReturn(storeResultFuture);
 +
 +    }
 +
 +    @Test
 +    public void test() throws IllegalAccessException, ExecutionException, 
InterruptedException, RemotingException, 
com.alipay.sofa.jraft.error.RemotingException {
 +
 +        MqttMessageUpContext upContext = new MqttMessageUpContext();
 +        upContext.setNamespace("testPubProcessor");
 +
 +        CompletableFuture<HookResult> hookResultCompletableFuture = 
publishProcessor5.process(upContext, publishMessage);
 +
 +        verify(firstTopicManager).checkFirstTopicIfCreated(anyString());
 +        Assert.assertNull(hookResultCompletableFuture.get().getRemark());
 +        Assert.assertNotNull(hookResultCompletableFuture.get().getData());
 +    }
 +
 +    @Test
 +    public void testClientTopicAlias() throws ExecutionException, 
InterruptedException {
 +
 +        MqttMessageUpContext upContext = new MqttMessageUpContext();
 +        upContext.setNamespace("testPubProcessor");
 +
 +        Map<Integer, String> topicAliasMap = new ConcurrentHashMap<>();
 +        topicAliasMap.put(10, "testQueue");
 +        upContext.setClientTopicAliasMap(topicAliasMap);
 +
 +        CompletableFuture<HookResult> hookResultCompletableFuture = 
publishProcessor5.process(upContext, publishMessage);
 +
 +        Assert.assertEquals(topicAliasMap.get(10), "test");
 +        verify(firstTopicManager).checkFirstTopicIfCreated(anyString());
 +        Assert.assertNull(hookResultCompletableFuture.get().getRemark());
 +        Assert.assertNotNull(hookResultCompletableFuture.get().getData());
 +    }
 +
 +}
diff --cc pom.xml
index c34e7a2,56cab7e..de2e899
--- a/pom.xml
+++ b/pom.xml
@@@ -46,9 -46,10 +46,10 @@@
          <protoc-gen-grpc-java.version>1.24.0</protoc-gen-grpc-java.version>
          <rpc-grpc-impl.version>1.3.8</rpc-grpc-impl.version>
          <guava.version>32.0.0-jre</guava.version>
-         <jraft-core.version>1.3.11</jraft-core.version>
 +        <mqtt.codec.version>4.1.100.Final</mqtt.codec.version>
- 
+         <jraft-core.version>1.3.12</jraft-core.version>
 -        <netty.version>4.1.43.Final</netty.version>
+         <netty.tcnative.version>2.0.26.Final</netty.tcnative.version>
+         <netty.quic.version>0.0.62.Final</netty.quic.version>
      </properties>
  
      <dependencyManagement>
@@@ -101,7 -97,7 +102,7 @@@
              <dependency>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-all</artifactId>
-                 <version>4.1.100.Final</version>
 -                <version>${netty.version}</version>
++                <version>${mqtt.codec.version}</version>
              </dependency>
              <dependency>
                  <groupId>io.netty</groupId>

Reply via email to