Repository: incubator-rocketmq Updated Branches: refs/heads/tls [created] 1cf9099e9
Add TLS Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/66b5c724 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/66b5c724 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/66b5c724 Branch: refs/heads/tls Commit: 66b5c7241651f54f7d483f99278cda537b5e4aac Parents: c4a3e0c Author: Li Zhanhui <[email protected]> Authored: Tue Jun 13 17:15:54 2017 +0800 Committer: Li Zhanhui <[email protected]> Committed: Tue Jun 13 17:15:54 2017 +0800 ---------------------------------------------------------------------- .../apache/rocketmq/broker/BrokerStartup.java | 1 + .../apache/rocketmq/client/ClientConfig.java | 14 ++- .../client/impl/factory/MQClientInstance.java | 1 + .../rocketmq/common/protocol/RequestCode.java | 4 +- remoting/pom.xml | 18 +++ .../rocketmq/remoting/RemotingClient.java | 14 +-- .../remoting/netty/FileRegionEncoder.java | 76 ++++++++++++ .../remoting/netty/NettyClientConfig.java | 10 ++ .../remoting/netty/NettyRemotingAbstract.java | 6 + .../remoting/netty/NettyRemotingClient.java | 37 ++++-- .../remoting/netty/NettyRemotingServer.java | 72 ++++++++++-- .../remoting/netty/NettySystemConfig.java | 16 ++- .../rocketmq/remoting/netty/SslHelper.java | 115 +++++++++++++++++++ .../protocol/RemotingSysRequestCode.java | 26 +++++ 14 files changed, 385 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 85d2e3a..dbea561 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -95,6 +95,7 @@ public class BrokerStartup { final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); + nettyClientConfig.setUseTLS(NettySystemConfig.enableSSL); nettyServerConfig.setListenPort(10911); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 950d756..8f255f0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -45,6 +45,8 @@ public class ClientConfig { private String unitName; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); + private boolean useTLS; + public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); @@ -92,6 +94,7 @@ public class ClientConfig { this.unitMode = cc.unitMode; this.unitName = cc.unitName; this.vipChannelEnabled = cc.vipChannelEnabled; + this.useTLS = cc.useTLS; } public ClientConfig cloneClientConfig() { @@ -106,6 +109,7 @@ public class ClientConfig { cc.unitMode = unitMode; cc.unitName = unitName; cc.vipChannelEnabled = vipChannelEnabled; + cc.useTLS = useTLS; return cc; } @@ -173,12 +177,20 @@ public class ClientConfig { this.vipChannelEnabled = vipChannelEnabled; } + public boolean isUseTLS() { + return useTLS; + } + + public void setUseTLS(boolean useTLS) { + this.useTLS = useTLS; + } + @Override public String toString() { return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled=" - + vipChannelEnabled + "]"; + + vipChannelEnabled + ", useTLS=" + useTLS + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f146be9..463b2ce 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -127,6 +127,7 @@ public class MQClientInstance { this.instanceIndex = instanceIndex; this.nettyClientConfig = new NettyClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); + this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.clientRemotingProcessor = new ClientRemotingProcessor(this); this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 6f132f7..e8d87b7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -17,7 +17,9 @@ package org.apache.rocketmq.common.protocol; -public class RequestCode { +import org.apache.rocketmq.remoting.protocol.RemotingSysRequestCode; + +public class RequestCode extends RemotingSysRequestCode { public static final int SEND_MESSAGE = 10; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/pom.xml ---------------------------------------------------------------------- diff --git a/remoting/pom.xml b/remoting/pom.xml index 1552341..413b13d 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -45,5 +45,23 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + <version>2.0.0.Final</version> + <classifier>${os.detected.classifier}</classifier> + <optional>true</optional> + </dependency> </dependencies> + + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.4.0.Final</version> + </extension> + </extensions> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java index 276a565..b527408 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java @@ -27,24 +27,24 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface RemotingClient extends RemotingService { - public void updateNameServerAddressList(final List<String> addrs); + void updateNameServerAddressList(final List<String> addrs); - public List<String> getNameServerAddressList(); + List<String> getNameServerAddressList(); - public RemotingCommand invokeSync(final String addr, final RemotingCommand request, + RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException; - public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, + void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; - public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) + void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; - public void registerProcessor(final int requestCode, final NettyRequestProcessor processor, + void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); - public boolean isChannelWriteable(final String addr); + boolean isChannelWritable(final String addr); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java new file mode 100644 index 0000000..c7e5af4 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.FileRegion; +import io.netty.handler.codec.MessageToByteEncoder; + +import io.netty.handler.ssl.SslHandler; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +/** + * <p> + * By default, file region are directly transferred to socket channel which is known as zero copy. In case we need + * to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this + * process. + * </p> + */ +public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> { + + /** + * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that + * can be handled by this encoder. + * + * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link + * io.netty.handler.codec.MessageToByteEncoder} belongs to + * @param msg the message to encode + * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written + * @throws Exception is thrown if an error occurs + */ + @Override + protected void encode(ChannelHandlerContext ctx, FileRegion msg, final ByteBuf out) throws Exception { + WritableByteChannel writableByteChannel = new WritableByteChannel() { + @Override + public int write(ByteBuffer src) throws IOException { + out.writeBytes(src); + return out.capacity(); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() throws IOException { + } + }; + + while (true) { + long position = msg.transfered(); + msg.transferTo(writableByteChannel, position); + if (msg.count() == 0) { + break; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java index 9edaa54..fbc071b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java @@ -38,6 +38,8 @@ public class NettyClientConfig { private boolean clientPooledByteBufAllocatorEnable = false; private boolean clientCloseSocketIfTimeout = false; + private boolean useTLS; + public boolean isClientCloseSocketIfTimeout() { return clientCloseSocketIfTimeout; } @@ -125,4 +127,12 @@ public class NettyClientConfig { public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) { this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable; } + + public boolean isUseTLS() { + return useTLS; + } + + public void setUseTLS(boolean useTLS) { + this.useTLS = useTLS; + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 0ba714a..73fcee0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.ssl.SslContext; import java.net.SocketAddress; import java.util.HashMap; import java.util.Iterator; @@ -89,6 +90,11 @@ public abstract class NettyRemotingAbstract { protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; /** + * SSL Context. + */ + protected SslContext sslContext; + + /** * Constructor, specifying capacity of one-way and asynchronous semaphores. * @param permitsOneway Number of permits for one-way requests. * @param permitsAsync Number of permits for asynchronous requests. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 1c3da9a..9f2d062 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,6 +35,7 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.net.SocketAddress; +import java.security.cert.CertificateException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import javax.net.ssl.SSLException; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -120,6 +123,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); } }); + + if (NettySystemConfig.enableSSL) { + try { + sslContext = SslHelper.buildSslContext(true); + log.info("SSL enabled for client"); + } catch (SSLException e) { + log.error("Failed to create SSLContext", e); + } catch (CertificateException e) { + log.error("Failed to create SSLContext", e); + throw new RuntimeException("Failed to create SSLContext", e); + } + } } private static int initValueIndex() { @@ -151,7 +166,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( + ChannelPipeline pipeline = ch.pipeline(); + if (nettyClientConfig.isUseTLS() && null != sslContext) { + pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); + log.info("Prepend SSL handler"); + } + pipeline.addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), @@ -421,17 +441,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private Channel createChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + cw.getChannel().close(); + channelTables.remove(addr); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { - boolean createNewConnection = false; + boolean createNewConnection; cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { - return cw.getChannel(); + cw.getChannel().close(); + this.channelTables.remove(addr); + createNewConnection = true; } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { @@ -530,10 +553,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } @Override - public boolean isChannelWriteable(String addr) { + public boolean isChannelWritable(String addr) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.isWriteable(); + return cw.isWritable(); } return true; } @@ -569,7 +592,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return this.channelFuture.channel() != null && this.channelFuture.channel().isActive(); } - public boolean isWriteable() { + public boolean isWritable() { return this.channelFuture.channel().isWritable(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index a9a55ab..70e5bae 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.netty; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -37,12 +38,14 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.net.InetSocketAddress; +import java.security.cert.CertificateException; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import javax.net.ssl.SSLException; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -74,6 +77,10 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private int port = 0; + private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + private static final String TLS_HANDLER_NAME = "sslHandler"; + private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; + public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); } @@ -128,6 +135,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } }); } + + if (NettySystemConfig.enableSSL) { + try { + sslContext = SslHelper.buildSslContext(false); + log.info("SSL enabled for server"); + } catch (CertificateException e) { + log.error("Failed to create SSLContext for server", e); + throw new RuntimeException(e); + } catch (SSLException e) { + log.error("Failed to create SSLContext for server", e); + throw new RuntimeException(e); + } + } } private boolean useEpoll() { @@ -163,13 +183,15 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - defaultEventExecutorGroup, - new NettyEncoder(), - new NettyDecoder(), - new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnectManageHandler(), - new NettyServerHandler()); + ch.pipeline() + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) + .addLast(defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new NettyConnectManageHandler(), + new NettyServerHandler() + ); } }); @@ -297,6 +319,42 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return this.publicExecutor; } + class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private static final byte HANDSHAKE_MAGIC_CODE = 0x16; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + + // mark the current position so that we can peek the first byte to determine if the content is starting with + // TLS handshake + msg.markReaderIndex(); + + byte b = msg.getByte(0); + + if (b == HANDSHAKE_MAGIC_CODE) { + if (null != sslContext) { + ctx.pipeline() + .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) + .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); + log.info("SSL handler prepended to channel pipeline"); + } else { + ctx.close(); + log.error("Requiring SSL handler but sslContext is being null"); + } + } + + // reset the reader index so that handshake negotiation may proceed as normal. + msg.resetReaderIndex(); + + // Remove this handler + ctx.pipeline().remove(HANDSHAKE_HANDLER_NAME); + + // Hand over this message to the next . + ctx.fireChannelRead(msg.retain()); + } + } + class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java index 52556fc..4a071c5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java @@ -28,9 +28,15 @@ public class NettySystemConfig { "com.rocketmq.remoting.clientAsyncSemaphoreValue"; public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = // "com.rocketmq.remoting.clientOnewaySemaphoreValue"; + + public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_ENABLE = // + "org.apache.rocketmq.remoting.ssl.enable"; + + public static final String ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE = // + "org.apache.rocketmq.remoting.ssl.config.file"; + public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // - Boolean - .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); + Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = // @@ -39,4 +45,10 @@ public class NettySystemConfig { Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); public static int socketRcvbufSize = // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); + + public static boolean enableSSL = // + Boolean.parseBoolean(System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_ENABLE, "true")); + + public static String sslConfigFile = // + System.getProperty(ORG_APACHE_ROCKETMQ_REMOTING_SSL_CONFIG_FILE, "/etc/rocketmq/ssl.properties"); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java new file mode 100644 index 0000000..95bcdc4 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/SslHelper.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.cert.CertificateException; +import java.util.Properties; +import javax.net.ssl.SSLException; + +public class SslHelper { + + public static SslContext buildSslContext(boolean forClient) throws SSLException, CertificateException { + + File configFile = new File(NettySystemConfig.sslConfigFile); + boolean testMode = !(configFile.exists() && configFile.isFile() && configFile.canRead()); + Properties properties = null; + + if (!testMode) { + properties = new Properties(); + InputStream inputStream = null; + try { + inputStream = new FileInputStream(configFile); + properties.load(inputStream); + } catch (FileNotFoundException ignore) { + } catch (IOException ignore) { + } finally { + if (null != inputStream) { + try { + inputStream.close(); + } catch (IOException ignore) { + } + } + } + } + + SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK; + + if (forClient) { + if (testMode) { + return SslContextBuilder + .forClient() + .sslProvider(SslProvider.JDK) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } else { + return SslContextBuilder.forClient() + .sslProvider(provider) + .trustManager(new File(properties.getProperty("trustManager"))) + .keyManager( + properties.containsKey("client.keyCertChainFile") ? new File(properties.getProperty("client.keyCertChainFile")) : null, + properties.containsKey("client.keyFile") ? new File(properties.getProperty("client.key")) : null, + properties.containsKey("client.password") ? properties.getProperty("client.password") : null) + .build(); + } + } else { + + if (testMode) { + SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); + return SslContextBuilder + .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()) + .sslProvider(SslProvider.JDK) + .clientAuth(ClientAuth.OPTIONAL) + .build(); + } else { + return SslContextBuilder.forServer( + properties.containsKey("server.keyCertChainFile") ? new File(properties.getProperty("server.keyCertChainFile")) : null, + properties.containsKey("server.keyFile") ? new File(properties.getProperty("server.key")) : null, + properties.containsKey("server.password") ? properties.getProperty("server.password") : null) + .sslProvider(provider) + .trustManager(new File(properties.getProperty("server.trustManager"))) + .clientAuth(parseClientAuthMode(properties.getProperty("server.auth.client"))) + .build(); + } + } + } + + private static ClientAuth parseClientAuthMode(String authMode) { + if (null == authMode || authMode.trim().isEmpty()) { + return ClientAuth.NONE; + } + + if ("optional".equalsIgnoreCase(authMode)) { + return ClientAuth.OPTIONAL; + } + + return ClientAuth.REQUIRE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/66b5c724/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java new file mode 100644 index 0000000..32783fa --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSysRequestCode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol; + +public class RemotingSysRequestCode { + + /** + * Request to negotiate upgrading connection to TLS + */ + public static final int START_TLS = 1; +}
