This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch ssl_between_nodes in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9f77e329a87b4d98ed28642096a5840814291bb5 Author: HTHou <[email protected]> AuthorDate: Thu Jul 17 10:02:29 2025 +0800 dev --- .../iotdb/rpc/NettyTNonBlockingTransport.java | 52 ++++++++++------------ 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java index 1996a21f6a3..7584ac68792 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java @@ -81,7 +81,6 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { private final String keystorePassword; private final String truststorePath; private final String truststorePassword; - private boolean sslEnabled = false; public NettyTNonBlockingTransport( String host, @@ -100,7 +99,6 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { this.keystorePassword = keystorePassword; this.truststorePath = truststorePath; this.truststorePassword = truststorePassword; - this.sslEnabled = true; initBootstrap(); } @@ -118,27 +116,23 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { logger.info("Initializing channel for {}:{}", host, port); ChannelPipeline pipeline = ch.pipeline(); - - // 添加 SSL 处理器(如果启用) - if (sslEnabled) { - SslContext sslContext = createSslContext(); - SslHandler sslHandler = sslContext.newHandler(ch.alloc(), host, port); - // 增加握手超时时间 - sslHandler.setHandshakeTimeoutMillis(30000); - - pipeline.addLast("ssl", sslHandler); - // 添加SSL握手完成监听器 - sslHandler - .handshakeFuture() - .addListener( - future -> { - if (future.isSuccess()) { - logger.info("SSL handshake completed successfully"); - } else { - logger.info("SSL handshake failed: ", future.cause()); - } - }); - } + SslContext sslContext = createSslContext(); + SslHandler sslHandler = sslContext.newHandler(ch.alloc(), host, port); + // 增加握手超时时间 + sslHandler.setHandshakeTimeoutMillis(30000); + + pipeline.addLast("ssl", sslHandler); + // 添加SSL握手完成监听器 + sslHandler + .handshakeFuture() + .addListener( + future -> { + if (future.isSuccess()) { + logger.info("SSL handshake completed successfully"); + } else { + logger.info("SSL handshake failed: {}", future.cause().getMessage()); + } + }); // 添加业务处理器 pipeline.addLast("handler", new NettyTransportHandler()); @@ -194,11 +188,11 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { } try { - ByteBuf byteBuf = readQueue.take(); - // if (byteBuf == null) { - // logger.info("No data available for ByteBuffer read (non-blocking)"); - // return 0; // 非阻塞读取,没有数据时返回 0 - // } + ByteBuf byteBuf = readQueue.poll(); + if (byteBuf == null) { + logger.info("No data available for ByteBuffer read (non-blocking)"); + return 0; // 非阻塞读取,没有数据时返回 0 + } int available = Math.min(buffer.remaining(), byteBuf.readableBytes()); if (available > 0) { @@ -344,7 +338,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { connecting.set(false); }); future.get(); - return false; // 异步连接,立即返回 false + return true; // 异步连接,立即返回 false } catch (Exception e) { connecting.set(false); // throw new IOException("Failed to start connection", e);
