This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git

commit 90cfe7c90647e738963fe1e21489aff19865742d
Author: Li Zhanhui <lizhan...@gmail.com>
AuthorDate: Fri Apr 12 19:13:03 2024 +0800

    feat: add experimental support for QUIC transport
---
 docs/quic.md                                       | 16 +++++
 mqtt-common/pom.xml                                |  5 ++
 .../rocketmq/mqtt/common/util/NetworkHelper.java   | 35 ++++++++++
 mqtt-cs/pom.xml                                    |  5 ++
 .../rocketmq/mqtt/cs/config/ConnectConf.java       | 23 +++++++
 .../rocketmq/mqtt/cs/protocol/ssl/SslFactory.java  | 18 ++++-
 .../rocketmq/mqtt/cs/starter/MqttServer.java       | 80 ++++++++++++++++++++--
 pom.xml                                            | 14 ++++
 8 files changed, 190 insertions(+), 6 deletions(-)

diff --git a/docs/quic.md b/docs/quic.md
new file mode 100644
index 0000000..025d71a
--- /dev/null
+++ b/docs/quic.md
@@ -0,0 +1,16 @@
+## Setup
+Download emqtt-bench with QUIC feature
+```shell
+wget 
https://github.com/emqx/emqtt-bench/releases/download/0.4.18/emqtt-bench-0.4.18-macos13-amd64-quic.zip
+```
+
+## Test Connect
+```shell
+./bin/emqtt_bench conn -u user0 -P secret0 -V 4 --quic true -h localhost -p 
14567 -c 10
+```
+
+### Pub
+
+```shell
+./bin/emqtt_bench pub -u user0 -P secret0 -V 4 --quic true -h localhost -p 
14567 -c 1 -t T_Event/test
+```
diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml
index 1edfa50..47d1c0d 100644
--- a/mqtt-common/pom.xml
+++ b/mqtt-common/pom.xml
@@ -68,6 +68,11 @@
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty.incubator</groupId>
+            <artifactId>netty-incubator-codec-native-quic</artifactId>
+            <classifier>${os.detected.classifier}</classifier>
+        </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
diff --git 
a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/NetworkHelper.java
 
b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/NetworkHelper.java
new file mode 100644
index 0000000..769cd89
--- /dev/null
+++ 
b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/NetworkHelper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mqtt.common.util;
+
+import io.netty.incubator.codec.quic.QuicStreamAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+
+public class NetworkHelper {
+    public static String socketAddressToString(SocketAddress address) {
+        if (address instanceof InetSocketAddress) {
+            return NetworkUtil.socketAddress2String(address);
+        } else if (address instanceof QuicStreamAddress) {
+            QuicStreamAddress quicAddress = (QuicStreamAddress) address;
+            return String.format("quic[%d]", quicAddress.streamId());
+        }
+        throw new RuntimeException("Unsupported SocketAddress type");
+    }
+}
diff --git a/mqtt-cs/pom.xml b/mqtt-cs/pom.xml
index c1c4521..da12f23 100644
--- a/mqtt-cs/pom.xml
+++ b/mqtt-cs/pom.xml
@@ -34,6 +34,11 @@
             <groupId>io.netty</groupId>
             <artifactId>netty-tcnative-boringssl-static</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty.incubator</groupId>
+            <artifactId>netty-incubator-codec-native-quic</artifactId>
+            <classifier>${os.detected.classifier}</classifier>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
index f558023..dcab351 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java
@@ -32,9 +32,24 @@ public class ConnectConf {
     private File confFile;
     private int nettySelectorThreadNum = 1;
     private int nettyWorkerThreadNum = 
Runtime.getRuntime().availableProcessors() * 2;
+
+    /**
+     * TCP ports 8883 and 1883 are registered with IANA for MQTT TLS and 
non-TLS communication respectively.
+     */
     private int mqttPort = 1883;
+
+    /**
+     * TCP ports 8883 and 1883 are registered with IANA for MQTT TLS and 
non-TLS communication respectively.
+     */
     private int mqttTlsPort = 8883;
+
+    /**
+     * Uses this port for both normal WebSocket connections and WebSocket 
connections over TLS as well.
+     */
     private int mqttWsPort = 8888;
+
+    private int quicPort = 14567;
+
     private boolean enableTlsSever = false;
     private boolean needClientAuth = false;
     private String sslCaCertFile;
@@ -276,4 +291,12 @@ public class ConnectConf {
     public void setMaxTransferCountOnMessageInDisk(int 
maxTransferCountOnMessageInDisk) {
         this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk;
     }
+
+    public int getQuicPort() {
+        return quicPort;
+    }
+
+    public void setQuicPort(int quicPort) {
+        this.quicPort = quicPort;
+    }
 }
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java
 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java
index 08d776e..f214eae 100644
--- 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java
+++ 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java
@@ -24,6 +24,10 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 import org.apache.commons.lang3.StringUtils;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.netty.incubator.codec.quic.QuicSslContext;
+import io.netty.incubator.codec.quic.QuicSslContextBuilder;
+import java.security.cert.CertificateException;
 import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.srvutil.FileWatchService;
 import org.slf4j.Logger;
@@ -50,6 +54,8 @@ public class SslFactory {
 
     private FileWatchService fileWatchService;
 
+    private QuicSslContext quicSslContext;
+
     @PostConstruct
     private void initSslContext() {
         if (!connectConf.isEnableTlsSever()) {
@@ -78,7 +84,13 @@ public class SslFactory {
                 contextBuilder.trustManager(new 
File(connectConf.getSslCaCertFile()));
             }
             sslContext = contextBuilder.build();
-        } catch (IOException e) {
+
+            SelfSignedCertificate selfSignedCertificate = new 
SelfSignedCertificate();
+            quicSslContext = QuicSslContextBuilder.forServer(
+                    selfSignedCertificate.privateKey(), null, 
selfSignedCertificate.certificate())
+                .applicationProtocols("mqtt")
+                .build();
+        } catch (IOException | CertificateException e) {
             throw new RuntimeException("failed to initialize ssl context.", e);
         }
     }
@@ -143,5 +155,9 @@ public class SslFactory {
     public SslContext getSslContext() {
         return sslContext;
     }
+
+    public QuicSslContext getQuicSslContext() {
+        return quicSslContext;
+    }
 }
 
diff --git 
a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java 
b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java
index 133c08f..5bf16ad 100644
--- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java
+++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java
@@ -17,14 +17,19 @@
 
 package org.apache.rocketmq.mqtt.cs.starter;
 
+import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
@@ -33,8 +38,13 @@ import io.netty.handler.codec.mqtt.MqttEncoder;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import javax.annotation.PreDestroy;
-import org.apache.rocketmq.mqtt.cs.channel.AdaptiveTlsHandler;
+import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
+import io.netty.incubator.codec.quic.QuicChannel;
+import io.netty.incubator.codec.quic.QuicServerCodecBuilder;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler;
+import org.apache.rocketmq.mqtt.cs.channel.AdaptiveTlsHandler;
 import org.apache.rocketmq.mqtt.cs.config.ConnectConf;
 import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketDispatcher;
 import org.apache.rocketmq.mqtt.cs.protocol.ssl.SslFactory;
@@ -57,6 +67,8 @@ public class MqttServer {
     private final ServerBootstrap wsServerBootstrap = new ServerBootstrap();
     private final ServerBootstrap tlsServerBootstrap = new ServerBootstrap();
 
+    private final Bootstrap quicBootstrap = new Bootstrap();
+
     @Resource
     private ConnectHandler connectHandler;
 
@@ -86,8 +98,12 @@ public class MqttServer {
         adaptiveTlsHandler = new AdaptiveTlsHandler(TlsMode.PERMISSIVE, 
sslFactory);
 
         start();
-        startWs();
         startTls();
+
+        startWs();
+
+        // QUIC over DTLS
+        startQuic();
     }
 
     @PreDestroy
@@ -113,7 +129,7 @@ public class MqttServer {
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
-                public void initChannel(SocketChannel ch) throws Exception {
+                public void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     assembleHandlerPipeline(pipeline);
                 }
@@ -138,7 +154,7 @@ public class MqttServer {
             .localAddress(new InetSocketAddress(tlsPort))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
-                public void initChannel(SocketChannel ch) throws Exception {
+                public void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast("sslHandler", new 
SslHandler(sslFactory.buildSslEngine(ch)));
                     assembleHandlerPipeline(pipeline);
@@ -181,7 +197,7 @@ public class MqttServer {
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
-                public void initChannel(SocketChannel ch) throws Exception {
+                public void initChannel(SocketChannel ch) {
                     ChannelPipeline pipeline = ch.pipeline();
                     pipeline.addLast(AdaptiveTlsHandler.class.getSimpleName(), 
adaptiveTlsHandler);
                     pipeline.addLast("connectHandler", connectHandler);
@@ -199,4 +215,58 @@ public class MqttServer {
         LOGGER.info("MQTT server for WebSocket started, listening: {}", port);
     }
 
+    private void startQuic() throws InterruptedException {
+        ChannelHandler channelHandler = new QuicServerCodecBuilder()
+            .sslContext(sslFactory.getQuicSslContext())
+            .maxIdleTimeout(5000, TimeUnit.SECONDS)
+            // Configure some limits for the maximal number of streams (and 
the data) that we want to handle.
+            .initialMaxData(10000000)
+            .initialMaxStreamDataBidirectionalLocal(1000000)
+            .initialMaxStreamDataBidirectionalRemote(1000000)
+            .initialMaxStreamsBidirectional(100)
+            .initialMaxStreamsUnidirectional(100)
+            .activeMigration(true)
+
+            // Setup a token handler. In a production system you would want to 
implement and provide your custom
+            // one.
+            .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
+            // ChannelHandler that is added into QuicChannel pipeline.
+            .handler(new ChannelInboundHandlerAdapter() {
+                @Override
+                public void channelActive(ChannelHandlerContext ctx) {
+                    QuicChannel channel = (QuicChannel) ctx.channel();
+                    LOGGER.debug("QUIC Connection Established: remote={}", 
channel.remoteAddress());
+                    // Create streams etc..
+                }
+
+                public void channelInactive(ChannelHandlerContext ctx) {
+                    ((QuicChannel) ctx.channel()).collectStats().addListener(f 
-> {
+                        if (f.isSuccess()) {
+                            LOGGER.info("Connection closed: {}", f.getNow());
+                        }
+                    });
+                }
+
+                @Override
+                public boolean isSharable() {
+                    return true;
+                }
+            })
+            .streamHandler(new ChannelInitializer<QuicStreamChannel>() {
+                @Override
+                protected void initChannel(QuicStreamChannel ch) {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    assembleHandlerPipeline(pipeline);
+                }
+            })
+            .build();
+
+        quicBootstrap.group(workerEventLoopGroup)
+            .channel(NioDatagramChannel.class)
+            .handler(channelHandler)
+            .bind(new InetSocketAddress(connectConf.getQuicPort()))
+            .sync();
+        LOGGER.info("MQTT server for QUIC over DTLS started, listening: {}", 
connectConf.getQuicPort());
+    }
+
 }
diff --git a/pom.xml b/pom.xml
index 6826ab6..e863aa3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,6 +49,7 @@
         <jraft-core.version>1.3.11</jraft-core.version>
         <netty.version>4.1.43.Final</netty.version>
         <netty.tcnative.version>2.0.26.Final</netty.tcnative.version>
+        <netty.quic.version>0.0.62.Final</netty.quic.version>
     </properties>
 
     <dependencyManagement>
@@ -103,6 +104,12 @@
                 <artifactId>netty-tcnative-boringssl-static</artifactId>
                 <version>${netty.tcnative.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.netty.incubator</groupId>
+                <artifactId>netty-incubator-codec-native-quic</artifactId>
+                <version>${netty.quic.version}</version>
+                <classifier>${os.detected.classifier}</classifier>
+            </dependency>
             <dependency>
                 <groupId>org.springframework</groupId>
                 <artifactId>spring-core</artifactId>
@@ -262,6 +269,13 @@
     </dependencyManagement>
 
     <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.7.0</version>
+            </extension>
+        </extensions>
         <plugins>
             <plugin>
                 <artifactId>maven-resources-plugin</artifactId>

Reply via email to