This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
commit 90cfe7c90647e738963fe1e21489aff19865742d Author: Li Zhanhui <lizhan...@gmail.com> AuthorDate: Fri Apr 12 19:13:03 2024 +0800 feat: add experimental support for QUIC transport --- docs/quic.md | 16 +++++ mqtt-common/pom.xml | 5 ++ .../rocketmq/mqtt/common/util/NetworkHelper.java | 35 ++++++++++ mqtt-cs/pom.xml | 5 ++ .../rocketmq/mqtt/cs/config/ConnectConf.java | 23 +++++++ .../rocketmq/mqtt/cs/protocol/ssl/SslFactory.java | 18 ++++- .../rocketmq/mqtt/cs/starter/MqttServer.java | 80 ++++++++++++++++++++-- pom.xml | 14 ++++ 8 files changed, 190 insertions(+), 6 deletions(-) diff --git a/docs/quic.md b/docs/quic.md new file mode 100644 index 0000000..025d71a --- /dev/null +++ b/docs/quic.md @@ -0,0 +1,16 @@ +## Setup +Download emqtt-bench with QUIC feature +```shell +wget https://github.com/emqx/emqtt-bench/releases/download/0.4.18/emqtt-bench-0.4.18-macos13-amd64-quic.zip +``` + +## Test Connect +```shell +./bin/emqtt_bench conn -u user0 -P secret0 -V 4 --quic true -h localhost -p 14567 -c 10 +``` + +### Pub + +```shell +./bin/emqtt_bench pub -u user0 -P secret0 -V 4 --quic true -h localhost -p 14567 -c 1 -t T_Event/test +``` diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml index 1edfa50..47d1c0d 100644 --- a/mqtt-common/pom.xml +++ b/mqtt-common/pom.xml @@ -68,6 +68,11 @@ <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> + <dependency> + <groupId>io.netty.incubator</groupId> + <artifactId>netty-incubator-codec-native-quic</artifactId> + <classifier>${os.detected.classifier}</classifier> + </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> diff --git a/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/NetworkHelper.java b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/NetworkHelper.java new file mode 100644 index 0000000..769cd89 --- /dev/null +++ b/mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/NetworkHelper.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.common.util; + +import io.netty.incubator.codec.quic.QuicStreamAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.apache.rocketmq.common.utils.NetworkUtil; + +public class NetworkHelper { + public static String socketAddressToString(SocketAddress address) { + if (address instanceof InetSocketAddress) { + return NetworkUtil.socketAddress2String(address); + } else if (address instanceof QuicStreamAddress) { + QuicStreamAddress quicAddress = (QuicStreamAddress) address; + return String.format("quic[%d]", quicAddress.streamId()); + } + throw new RuntimeException("Unsupported SocketAddress type"); + } +} diff --git a/mqtt-cs/pom.xml b/mqtt-cs/pom.xml index c1c4521..da12f23 100644 --- a/mqtt-cs/pom.xml +++ b/mqtt-cs/pom.xml @@ -34,6 +34,11 @@ <groupId>io.netty</groupId> <artifactId>netty-tcnative-boringssl-static</artifactId> </dependency> + <dependency> + <groupId>io.netty.incubator</groupId> + <artifactId>netty-incubator-codec-native-quic</artifactId> + <classifier>${os.detected.classifier}</classifier> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java index f558023..dcab351 100644 --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/config/ConnectConf.java @@ -32,9 +32,24 @@ public class ConnectConf { private File confFile; private int nettySelectorThreadNum = 1; private int nettyWorkerThreadNum = Runtime.getRuntime().availableProcessors() * 2; + + /** + * TCP ports 8883 and 1883 are registered with IANA for MQTT TLS and non-TLS communication respectively. + */ private int mqttPort = 1883; + + /** + * TCP ports 8883 and 1883 are registered with IANA for MQTT TLS and non-TLS communication respectively. + */ private int mqttTlsPort = 8883; + + /** + * Uses this port for both normal WebSocket connections and WebSocket connections over TLS as well. + */ private int mqttWsPort = 8888; + + private int quicPort = 14567; + private boolean enableTlsSever = false; private boolean needClientAuth = false; private String sslCaCertFile; @@ -276,4 +291,12 @@ public class ConnectConf { public void setMaxTransferCountOnMessageInDisk(int maxTransferCountOnMessageInDisk) { this.maxTransferCountOnMessageInDisk = maxTransferCountOnMessageInDisk; } + + public int getQuicPort() { + return quicPort; + } + + public void setQuicPort(int quicPort) { + this.quicPort = quicPort; + } } diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java index 08d776e..f214eae 100644 --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/ssl/SslFactory.java @@ -24,6 +24,10 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; import org.apache.commons.lang3.StringUtils; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.incubator.codec.quic.QuicSslContext; +import io.netty.incubator.codec.quic.QuicSslContextBuilder; +import java.security.cert.CertificateException; import org.apache.rocketmq.mqtt.cs.config.ConnectConf; import org.apache.rocketmq.srvutil.FileWatchService; import org.slf4j.Logger; @@ -50,6 +54,8 @@ public class SslFactory { private FileWatchService fileWatchService; + private QuicSslContext quicSslContext; + @PostConstruct private void initSslContext() { if (!connectConf.isEnableTlsSever()) { @@ -78,7 +84,13 @@ public class SslFactory { contextBuilder.trustManager(new File(connectConf.getSslCaCertFile())); } sslContext = contextBuilder.build(); - } catch (IOException e) { + + SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); + quicSslContext = QuicSslContextBuilder.forServer( + selfSignedCertificate.privateKey(), null, selfSignedCertificate.certificate()) + .applicationProtocols("mqtt") + .build(); + } catch (IOException | CertificateException e) { throw new RuntimeException("failed to initialize ssl context.", e); } } @@ -143,5 +155,9 @@ public class SslFactory { public SslContext getSslContext() { return sslContext; } + + public QuicSslContext getQuicSslContext() { + return quicSslContext; + } } diff --git a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java index 133c08f..5bf16ad 100644 --- a/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java +++ b/mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/starter/MqttServer.java @@ -17,14 +17,19 @@ package org.apache.rocketmq.mqtt.cs.starter; +import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; @@ -33,8 +38,13 @@ import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; import javax.annotation.PreDestroy; -import org.apache.rocketmq.mqtt.cs.channel.AdaptiveTlsHandler; +import io.netty.incubator.codec.quic.InsecureQuicTokenHandler; +import io.netty.incubator.codec.quic.QuicChannel; +import io.netty.incubator.codec.quic.QuicServerCodecBuilder; +import io.netty.incubator.codec.quic.QuicStreamChannel; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.mqtt.cs.channel.ConnectHandler; +import org.apache.rocketmq.mqtt.cs.channel.AdaptiveTlsHandler; import org.apache.rocketmq.mqtt.cs.config.ConnectConf; import org.apache.rocketmq.mqtt.cs.protocol.mqtt.MqttPacketDispatcher; import org.apache.rocketmq.mqtt.cs.protocol.ssl.SslFactory; @@ -57,6 +67,8 @@ public class MqttServer { private final ServerBootstrap wsServerBootstrap = new ServerBootstrap(); private final ServerBootstrap tlsServerBootstrap = new ServerBootstrap(); + private final Bootstrap quicBootstrap = new Bootstrap(); + @Resource private ConnectHandler connectHandler; @@ -86,8 +98,12 @@ public class MqttServer { adaptiveTlsHandler = new AdaptiveTlsHandler(TlsMode.PERMISSIVE, sslFactory); start(); - startWs(); startTls(); + + startWs(); + + // QUIC over DTLS + startQuic(); } @PreDestroy @@ -113,7 +129,7 @@ public class MqttServer { .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); assembleHandlerPipeline(pipeline); } @@ -138,7 +154,7 @@ public class MqttServer { .localAddress(new InetSocketAddress(tlsPort)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("sslHandler", new SslHandler(sslFactory.buildSslEngine(ch))); assembleHandlerPipeline(pipeline); @@ -181,7 +197,7 @@ public class MqttServer { .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(AdaptiveTlsHandler.class.getSimpleName(), adaptiveTlsHandler); pipeline.addLast("connectHandler", connectHandler); @@ -199,4 +215,58 @@ public class MqttServer { LOGGER.info("MQTT server for WebSocket started, listening: {}", port); } + private void startQuic() throws InterruptedException { + ChannelHandler channelHandler = new QuicServerCodecBuilder() + .sslContext(sslFactory.getQuicSslContext()) + .maxIdleTimeout(5000, TimeUnit.SECONDS) + // Configure some limits for the maximal number of streams (and the data) that we want to handle. + .initialMaxData(10000000) + .initialMaxStreamDataBidirectionalLocal(1000000) + .initialMaxStreamDataBidirectionalRemote(1000000) + .initialMaxStreamsBidirectional(100) + .initialMaxStreamsUnidirectional(100) + .activeMigration(true) + + // Setup a token handler. In a production system you would want to implement and provide your custom + // one. + .tokenHandler(InsecureQuicTokenHandler.INSTANCE) + // ChannelHandler that is added into QuicChannel pipeline. + .handler(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + QuicChannel channel = (QuicChannel) ctx.channel(); + LOGGER.debug("QUIC Connection Established: remote={}", channel.remoteAddress()); + // Create streams etc.. + } + + public void channelInactive(ChannelHandlerContext ctx) { + ((QuicChannel) ctx.channel()).collectStats().addListener(f -> { + if (f.isSuccess()) { + LOGGER.info("Connection closed: {}", f.getNow()); + } + }); + } + + @Override + public boolean isSharable() { + return true; + } + }) + .streamHandler(new ChannelInitializer<QuicStreamChannel>() { + @Override + protected void initChannel(QuicStreamChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + assembleHandlerPipeline(pipeline); + } + }) + .build(); + + quicBootstrap.group(workerEventLoopGroup) + .channel(NioDatagramChannel.class) + .handler(channelHandler) + .bind(new InetSocketAddress(connectConf.getQuicPort())) + .sync(); + LOGGER.info("MQTT server for QUIC over DTLS started, listening: {}", connectConf.getQuicPort()); + } + } diff --git a/pom.xml b/pom.xml index 6826ab6..e863aa3 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ <jraft-core.version>1.3.11</jraft-core.version> <netty.version>4.1.43.Final</netty.version> <netty.tcnative.version>2.0.26.Final</netty.tcnative.version> + <netty.quic.version>0.0.62.Final</netty.quic.version> </properties> <dependencyManagement> @@ -103,6 +104,12 @@ <artifactId>netty-tcnative-boringssl-static</artifactId> <version>${netty.tcnative.version}</version> </dependency> + <dependency> + <groupId>io.netty.incubator</groupId> + <artifactId>netty-incubator-codec-native-quic</artifactId> + <version>${netty.quic.version}</version> + <classifier>${os.detected.classifier}</classifier> + </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> @@ -262,6 +269,13 @@ </dependencyManagement> <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.7.0</version> + </extension> + </extensions> <plugins> <plugin> <artifactId>maven-resources-plugin</artifactId>