This is an automated email from the ASF dual-hosted git repository. dongyuanpan pushed a commit to branch develop_mqtt5.0 in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
commit 4d2e294a2dab71bce84e375163f8400c9a378865 Merge: b210bd5 3cd226c Author: dongyuan.pdy <dongyuanp...@gmail.com> AuthorDate: Thu Jun 27 09:52:43 2024 +0800 merge develop distribution/conf/spring.xml | 2 +- docs/quic.md | 16 ++ mqtt-common/pom.xml | 5 + .../rocketmq/mqtt/common/facade/LmqQueueStore.java | 9 ++ .../mqtt/common/hook/AbstractUpstreamHook.java | 5 +- .../rocketmq/mqtt/common/hook/EventHook.java | 45 ++++++ .../mqtt/common/hook/EventHookManager.java | 42 +++++ .../rocketmq/mqtt/common/model/ClientEvent.java | 123 ++++++++++++++ .../rocketmq/mqtt/common/model/Constants.java | 6 + .../rocketmq/mqtt/common/model/EventType.java | 23 +++ .../rocketmq/mqtt/common/util/NetworkHelper.java | 35 ++++ .../mqtt/common/test/model/TestClientEvent.java | 57 +++++++ mqtt-cs/pom.xml | 11 +- .../mqtt/cs/channel/AdaptiveTlsHandler.java | 93 +++++++++++ .../mqtt/cs/channel/DefaultChannelManager.java | 33 +++- .../rocketmq/mqtt/cs/config/ConnectConf.java | 32 ++++ .../mqtt/cs/hook/EventHookManagerImpl.java | 171 ++++++++++++++++++++ .../protocol/mqtt/handler/MqttConnectHandler.java | 8 + .../rocketmq/mqtt/cs/protocol/ssl/SslFactory.java | 30 +++- .../rocketmq/mqtt/cs/session/loop/QueueCache.java | 35 ++-- .../mqtt/cs/session/loop/SessionLoopImpl.java | 3 +- .../rocketmq/mqtt/cs/starter/MqttServer.java | 177 +++++++++++++++++---- .../cs/test/hook/TestEventHookManagerImpl.java | 135 ++++++++++++++++ .../cs/test/hook/TestUpstreamHookManagerImpl.java | 28 ++-- .../mqtt/handler/TestMqttConnectHandler.java | 9 ++ .../cs/test/session/infly/TestRetryDriver.java | 30 ++-- .../rocketmq/mqtt/ds/auth/AuthManagerSample.java | 9 ++ .../ds/event/processor/ClientEventProcessor.java | 107 +++++++++++++ .../rocketmq/mqtt/ds/notify/NotifyManager.java | 15 +- .../mqtt/ds/store/LmqQueueStoreManager.java | 65 ++++++++ .../ds/upstream/mqtt/UpstreamProcessorManager.java | 10 +- .../upstream/mqtt/processor/PublishProcessor.java | 12 +- .../event/processor/TestClientEventProcessor.java | 97 +++++++++++ .../ds/test/meta/TestMetaPersistManagerSample.java | 10 +- .../mqtt/ds/test/notify/TestNotifyManager.java | 2 +- .../ds/test/store/TestLmqQueueStoreManager.java | 17 ++ .../mqtt/processor/TestPublishProcessor.java | 29 ++-- .../mqtt5/processor/TestPublishProcessor.java | 3 +- .../mqtt/example/MqttClientEventConsumer.java | 144 +++++++++++++++++ .../exporter/collector/MqttMetricsCollector.java | 24 +++ .../mqtt/exporter/collector/MqttMetricsInfo.java | 15 +- pom.xml | 22 ++- 42 files changed, 1624 insertions(+), 120 deletions(-) diff --cc mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java index 6871255,ef65edb..e83366d --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/channel/DefaultChannelManager.java @@@ -22,8 -21,9 +22,10 @@@ import io.netty.handler.codec.mqtt.Mqtt import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.mqtt.common.hook.EventHookManager; + import org.apache.rocketmq.mqtt.common.model.EventType; import org.apache.rocketmq.mqtt.cs.config.ConnectConf; +import org.apache.rocketmq.mqtt.cs.protocol.mqtt.facotry.MqttMessageFactory; import org.apache.rocketmq.mqtt.cs.session.Session; import org.apache.rocketmq.mqtt.cs.session.infly.MqttMsgId; import org.apache.rocketmq.mqtt.cs.session.infly.RetryDriver; @@@ -118,31 -124,6 +126,31 @@@ public class DefaultChannelManager impl @Override public void closeConnect(Channel channel, ChannelCloseFrom from, String reason) { - unloadResource(channel, reason); ++ unloadResource(channel, reason, from); + + if (channel.isActive()) { + channel.close(); + } + logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason); + } + + @Override + public void closeConnect(Channel channel, ChannelCloseFrom from, String reason, byte reasonCode) { - unloadResource(channel, reason); ++ unloadResource(channel, reason, from); + + if (channel.isActive()) { + channel.writeAndFlush(MqttMessageFactory.buildMqtt5DisconnectMessage(reasonCode, reason)); + channel.close(); + } + logger.info("Close Connect of channel {} from {} by reason of {}", channel, from, reason); + } + + @Override + public void closeConnectWithProtocolError(Channel channel, String reason) { + closeConnect(channel, ChannelCloseFrom.SERVER, reason, MqttReasonCodes.Disconnect.PROTOCOL_ERROR.byteValue()); + } + - public void unloadResource(Channel channel, String reason) { ++ public void unloadResource(Channel channel, String reason, ChannelCloseFrom from) { String clientId = ChannelInfo.getClientId(channel); String channelId = ChannelInfo.getId(channel); willLoop.closeConnect(channel, clientId, reason); diff --cc mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java index 82b1643,dcab351..4744eba --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java @@@ -57,11 -72,7 +72,12 @@@ public class ConnectConf private boolean enablePrometheus = false; private boolean exportJvmInfo = true; + private boolean enableSharedSubscription = false; + private boolean enableRetain = false; + private boolean enableSubscriptionIdentifier = false; + private int topicAliasMaximum = 10; + private int serverReceiveMaximum = 65535; + private int maxTransferCountOnMessageInDisk = 8; public ConnectConf() throws IOException { ClassPathResource classPathResource = new ClassPathResource(CONF_FILE_NAME); @@@ -273,43 -284,19 +289,59 @@@ this.sslServerKeyPassword = sslServerKeyPassword; } + public boolean isEnableSharedSubscription() { + return enableSharedSubscription; + } + + public void setEnableSharedSubscription(boolean enableSharedSubscription) { + this.enableSharedSubscription = enableSharedSubscription; + } + + public boolean isEnableRetain() { + return enableRetain; + } + + public void setEnableRetain(boolean enableRetain) { + this.enableRetain = enableRetain; + } + + public boolean isEnableSubscriptionIdentifier() { + return enableSubscriptionIdentifier; + } + + public void setEnableSubscriptionIdentifier(boolean enableSubscriptionIdentifier) { + this.enableSubscriptionIdentifier = enableSubscriptionIdentifier; + } + + public int getTopicAliasMaximum() { + return topicAliasMaximum; + } + + public void setTopicAliasMaximum(int topicAliasMaximum) { + this.topicAliasMaximum = topicAliasMaximum; + } + + public int getServerReceiveMaximum() { + return serverReceiveMaximum; + } + + public void setServerReceiveMaximum(int serverReceiveMaximum) { + this.serverReceiveMaximum = serverReceiveMaximum; + } ++ + public int getMaxTransferCountOnMessageInDisk() { + return maxTransferCountOnMessageInDisk; + } + + public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) { + this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk; + } + + public int getQuicPort() { + return quicPort; + } + + public void setQuicPort(int quicPort) { + this.quicPort = quicPort; + } } diff --cc mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java index 90bee39,5bf16ad..8b6af87 --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java @@@ -30,16 -35,18 +35,23 @@@ import io.netty.handler.codec.http.Http import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.codec.mqtt.MqttVersion; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.rocketmq.mqtt.cs.channel.ChannelManager; + import javax.annotation.PreDestroy; + import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; + import io.netty.incubator.codec.quic.QuicChannel; + import io.netty.incubator.codec.quic.QuicServerCodecBuilder; + import io.netty.incubator.codec.quic.QuicStreamChannel; + import java.util.concurrent.TimeUnit; import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler; + import org.apache.rocketmq.mqtt.cs.channel.AdaptiveTlsHandler; import org.apache.rocketmq.mqtt.cs.config.ConnectConf; +import org.apache.rocketmq.mqtt.cs.protocol.ChannelPipelineLazyInit; +import org.apache.rocketmq.mqtt.cs.protocol.MqttVersionHandler; import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketDispatcher; +import org.apache.rocketmq.mqtt.cs.protocol.mqtt5.Mqtt5PacketDispatcher; import org.apache.rocketmq.mqtt.cs.protocol.ssl.SslFactory; import org.apache.rocketmq.mqtt.cs.protocol.ws.WebSocketServerHandler; import org.apache.rocketmq.mqtt.cs.protocol.ws.WebSocketEncoder; @@@ -77,8 -84,11 +92,14 @@@ public class MqttServer @Resource private SslFactory sslFactory; ++ + @Resource + private ChannelManager channelManager; + private NioEventLoopGroup acceptorEventLoopGroup; + + private NioEventLoopGroup workerEventLoopGroup; + + private AdaptiveTlsHandler adaptiveTlsHandler; @PostConstruct public void init() throws Exception { @@@ -147,9 -161,29 +172,38 @@@ } }); tlsServerBootstrap.bind(); - logger.warn("start mqtt tls server , port:{}", tlsPort); + LOGGER.info("MQTT server for TCP over TLS started, listening: {}", tlsPort); + } + + private void assembleHandlerPipeline(ChannelPipeline pipeline) { + pipeline.addLast("connectHandler", connectHandler); - pipeline.addLast("decoder", new MqttDecoder(connectConf.getMaxPacketSizeInByte())); - pipeline.addLast("encoder", MqttEncoder.INSTANCE); - pipeline.addLast("dispatcher", mqttPacketDispatcher); ++ pipeline.addLast("versionHandler", new MqttVersionHandler(channelManager, new ChannelPipelineLazyInit() { ++ @Override ++ public void init(ChannelPipeline channelPipeline, MqttVersion mqttVersion) { ++ channelPipeline.addLast("decoder", new MqttDecoder(connectConf.getMaxPacketSizeInByte())); ++ channelPipeline.addLast("encoder", MqttEncoder.INSTANCE); ++ if (MqttVersion.MQTT_5.equals(mqttVersion)) { ++ channelPipeline.addLast("mqtt5PacketDispatcher", mqtt5PacketDispatcher); ++ } else { ++ channelPipeline.addLast("dispatcher", mqttPacketDispatcher); ++ } ++ } ++ })); } + /** + * Support Web Socket Transport. + * </p> + * + * Both WebSocket and WebSocket-Over-TLS/SSL are supported. + * </p> + * + * Clients are supposed to use one of the following Server URIs to connect: + * <ul> + * <li>ws://host:port/mqtt</li> + * <li>wss://host:port/mqtt</li> + * </ul> + */ private void startWs() { int port = connectConf.getMqttWsPort(); wsServerBootstrap diff --cc mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java index d160884,bdba0cb..d00fc35 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/auth/AuthManagerSample.java @@@ -89,9 -90,7 +90,10 @@@ public class AuthManagerSample extends logger.error("", e); } if (!Objects.equals(username, serviceConf.getUsername()) || !validateSign) { + if (mqttConnectMessage.variableHeader().version() == 5) { + return new HookResult(HookResult.FAIL, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD.byteValue(), Remark.AUTH_FAILED, null); + } + collectAuthFailedTps(); return new HookResult(HookResult.FAIL, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.byteValue(), Remark.AUTH_FAILED, null); } } diff --cc mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java index 5c1f71c,a2930f4..f53b6fe --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/store/LmqQueueStoreManager.java @@@ -238,9 -281,16 +281,17 @@@ public class LmqQueueStoreManager imple } } + private void collectEventReadWriteRt(String action, long rt, boolean status) { + try { + MqttMetricsCollector.collectEventReadWriteRt(rt, action, String.valueOf(status)); + } catch (PrometheusException e) { + logger.error("", e); + } + } + @Override - public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, long count) { + public CompletableFuture<PullResult> pullMessage(String firstTopic, Queue queue, QueueOffset queueOffset, + long count) { CompletableFuture<PullResult> result = new CompletableFuture<>(); try { MessageQueue messageQueue = new MessageQueue(firstTopic, queue.getBrokerName(), (int) queue.getQueueId()); diff --cc mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/UpstreamProcessorManager.java index b822c7d,fadf0fd..3a0e8e6 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/UpstreamProcessorManager.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/UpstreamProcessorManager.java @@@ -25,11 -25,12 +25,12 @@@ import org.apache.rocketmq.mqtt.common. import org.apache.rocketmq.mqtt.common.hook.UpstreamHookEnum; import org.apache.rocketmq.mqtt.common.hook.UpstreamHookManager; import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext; -import org.apache.rocketmq.mqtt.ds.upstream.processor.ConnectProcessor; -import org.apache.rocketmq.mqtt.ds.upstream.processor.DisconnectProcessor; -import org.apache.rocketmq.mqtt.ds.upstream.processor.PublishProcessor; -import org.apache.rocketmq.mqtt.ds.upstream.processor.SubscribeProcessor; -import org.apache.rocketmq.mqtt.ds.upstream.processor.UnSubscribeProcessor; +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.ConnectProcessor; +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.DisconnectProcessor; +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.PublishProcessor; +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.SubscribeProcessor; +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.processor.UnSubscribeProcessor; + import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@@ -66,13 -67,7 +67,13 @@@ public class UpstreamProcessorManager e @Override public CompletableFuture<HookResult> processMqttMessage(MqttMessageUpContext context, MqttMessage message) { - + CompletableFuture<HookResult> hookResult = new CompletableFuture<>(); + if (context.getMqttVersion() != null && MqttVersion.MQTT_5.equals(context.getMqttVersion())) { + hookResult.complete(new HookResult(HookResult.SUCCESS, null, null)); + return hookResult; + } + + collectProcessRequestTps(message.fixedHeader().messageType().name()); switch (message.fixedHeader().messageType()) { case CONNECT: return connectProcessor.process(context, message); diff --cc mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/processor/PublishProcessor.java index 66e3c64,01b4511..ae7b9df --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/processor/PublishProcessor.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/upstream/mqtt/processor/PublishProcessor.java @@@ -35,7 -35,8 +35,8 @@@ import org.apache.rocketmq.mqtt.common. import org.apache.rocketmq.mqtt.common.util.TopicUtils; import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager; import org.apache.rocketmq.mqtt.ds.meta.WildcardManager; -import org.apache.rocketmq.mqtt.ds.upstream.UpstreamProcessor; +import org.apache.rocketmq.mqtt.ds.upstream.mqtt.UpstreamProcessor; + import org.apache.rocketmq.mqtt.exporter.collector.MqttMetricsCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; diff --cc mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt/processor/TestPublishProcessor.java index 82652a7,ac5b07c..c159c52 --- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt/processor/TestPublishProcessor.java +++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt/processor/TestPublishProcessor.java @@@ -1,23 -1,21 +1,21 @@@ /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.apache.rocketmq.mqtt.ds.test.upstream.processor; +package org.apache.rocketmq.mqtt.ds.test.upstream.mqtt.processor; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --cc mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt5/processor/TestPublishProcessor.java index f77efd7,0000000..9a5651f mode 100644,000000..100644 --- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt5/processor/TestPublishProcessor.java +++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/upstream/mqtt5/processor/TestPublishProcessor.java @@@ -1,151 -1,0 +1,152 @@@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.rocketmq.mqtt.ds.test.upstream.mqtt5.processor; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.mqtt.common.facade.LmqQueueStore; +import org.apache.rocketmq.mqtt.common.hook.HookResult; ++import org.apache.rocketmq.mqtt.common.model.Message; +import org.apache.rocketmq.mqtt.common.model.MqttMessageUpContext; +import org.apache.rocketmq.mqtt.common.model.StoreResult; +import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager; +import org.apache.rocketmq.mqtt.ds.meta.WildcardManager; +import org.apache.rocketmq.mqtt.ds.upstream.mqtt5.processor.PublishProcessor5; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RESPONSE_TOPIC; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestPublishProcessor { + + @Mock + private LmqQueueStore lmqQueueStore; + + @Mock + private WildcardManager wildcardManager; + + @Mock + private FirstTopicManager firstTopicManager; + + private MqttPublishMessage publishMessage; + + private PublishProcessor5 publishProcessor5; + + @Before + public void init() throws IllegalAccessException { + publishProcessor5 = new PublishProcessor5(); + FieldUtils.writeDeclaredField(publishProcessor5, "lmqQueueStore", lmqQueueStore, true); + FieldUtils.writeDeclaredField(publishProcessor5, "wildcardManager", wildcardManager, true); + FieldUtils.writeDeclaredField(publishProcessor5, "firstTopicManager", firstTopicManager, true); + + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); + props.add(new MqttProperties.IntegerProperty(PUBLICATION_EXPIRY_INTERVAL.value(), 10)); + props.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), 1)); + props.add(new MqttProperties.StringProperty(RESPONSE_TOPIC.value(), "Response Topic")); + props.add(new MqttProperties.BinaryProperty(CORRELATION_DATA.value(), "Correlation Data".getBytes(StandardCharsets.UTF_8))); + props.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), "Content Type")); + + props.add(new MqttProperties.UserProperty("isSecret", "true")); + props.add(new MqttProperties.UserProperty("tag", "firstTag")); + props.add(new MqttProperties.UserProperty("tag", "secondTag")); + + props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 100)); + props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 101)); + props.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), 10)); + + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 1); + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("test", 0, props); + ByteBuf payload = Unpooled.copiedBuffer("test".getBytes(StandardCharsets.UTF_8)); + publishMessage = new MqttPublishMessage(mqttFixedHeader, variableHeader, payload); + + Set<String> queues = Collections.singleton("testQueue"); + when(wildcardManager.matchQueueSetByMsgTopic(anyString(), anyString())).thenReturn(queues); + CompletableFuture<StoreResult> storeResultFuture = new CompletableFuture<>(); + StoreResult storeResult = new StoreResult(); + storeResultFuture.complete(storeResult); - when(lmqQueueStore.putMessage(anySet(), any())).thenReturn(storeResultFuture); ++ when(lmqQueueStore.putMessage(anySet(), (Message) any())).thenReturn(storeResultFuture); + + } + + @Test + public void test() throws IllegalAccessException, ExecutionException, InterruptedException, RemotingException, com.alipay.sofa.jraft.error.RemotingException { + + MqttMessageUpContext upContext = new MqttMessageUpContext(); + upContext.setNamespace("testPubProcessor"); + + CompletableFuture<HookResult> hookResultCompletableFuture = publishProcessor5.process(upContext, publishMessage); + + verify(firstTopicManager).checkFirstTopicIfCreated(anyString()); + Assert.assertNull(hookResultCompletableFuture.get().getRemark()); + Assert.assertNotNull(hookResultCompletableFuture.get().getData()); + } + + @Test + public void testClientTopicAlias() throws ExecutionException, InterruptedException { + + MqttMessageUpContext upContext = new MqttMessageUpContext(); + upContext.setNamespace("testPubProcessor"); + + Map<Integer, String> topicAliasMap = new ConcurrentHashMap<>(); + topicAliasMap.put(10, "testQueue"); + upContext.setClientTopicAliasMap(topicAliasMap); + + CompletableFuture<HookResult> hookResultCompletableFuture = publishProcessor5.process(upContext, publishMessage); + + Assert.assertEquals(topicAliasMap.get(10), "test"); + verify(firstTopicManager).checkFirstTopicIfCreated(anyString()); + Assert.assertNull(hookResultCompletableFuture.get().getRemark()); + Assert.assertNotNull(hookResultCompletableFuture.get().getData()); + } + +} diff --cc pom.xml index c34e7a2,56cab7e..de2e899 --- a/pom.xml +++ b/pom.xml @@@ -46,9 -46,10 +46,10 @@@ <protoc-gen-grpc-java.version>1.24.0</protoc-gen-grpc-java.version> <rpc-grpc-impl.version>1.3.8</rpc-grpc-impl.version> <guava.version>32.0.0-jre</guava.version> - <jraft-core.version>1.3.11</jraft-core.version> + <mqtt.codec.version>4.1.100.Final</mqtt.codec.version> - + <jraft-core.version>1.3.12</jraft-core.version> - <netty.version>4.1.43.Final</netty.version> + <netty.tcnative.version>2.0.26.Final</netty.tcnative.version> + <netty.quic.version>0.0.62.Final</netty.quic.version> </properties> <dependencyManagement> @@@ -101,7 -97,7 +102,7 @@@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> - <version>4.1.100.Final</version> - <version>${netty.version}</version> ++ <version>${mqtt.codec.version}</version> </dependency> <dependency> <groupId>io.netty</groupId>