This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch ssl_between_nodes
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9f77e329a87b4d98ed28642096a5840814291bb5
Author: HTHou <[email protected]>
AuthorDate: Thu Jul 17 10:02:29 2025 +0800

    dev
---
 .../iotdb/rpc/NettyTNonBlockingTransport.java      | 52 ++++++++++------------
 1 file changed, 23 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
index 1996a21f6a3..7584ac68792 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
@@ -81,7 +81,6 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
   private final String keystorePassword;
   private final String truststorePath;
   private final String truststorePassword;
-  private boolean sslEnabled = false;
 
   public NettyTNonBlockingTransport(
       String host,
@@ -100,7 +99,6 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
     this.keystorePassword = keystorePassword;
     this.truststorePath = truststorePath;
     this.truststorePassword = truststorePassword;
-    this.sslEnabled = true;
     initBootstrap();
   }
 
@@ -118,27 +116,23 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
                 logger.info("Initializing channel for {}:{}", host, port);
 
                 ChannelPipeline pipeline = ch.pipeline();
-
-                // 添加 SSL 处理器(如果启用)
-                if (sslEnabled) {
-                  SslContext sslContext = createSslContext();
-                  SslHandler sslHandler = sslContext.newHandler(ch.alloc(), 
host, port);
-                  // 增加握手超时时间
-                  sslHandler.setHandshakeTimeoutMillis(30000);
-
-                  pipeline.addLast("ssl", sslHandler);
-                  // 添加SSL握手完成监听器
-                  sslHandler
-                      .handshakeFuture()
-                      .addListener(
-                          future -> {
-                            if (future.isSuccess()) {
-                              logger.info("SSL handshake completed 
successfully");
-                            } else {
-                              logger.info("SSL handshake failed: ", 
future.cause());
-                            }
-                          });
-                }
+                SslContext sslContext = createSslContext();
+                SslHandler sslHandler = sslContext.newHandler(ch.alloc(), 
host, port);
+                // 增加握手超时时间
+                sslHandler.setHandshakeTimeoutMillis(30000);
+
+                pipeline.addLast("ssl", sslHandler);
+                // 添加SSL握手完成监听器
+                sslHandler
+                    .handshakeFuture()
+                    .addListener(
+                        future -> {
+                          if (future.isSuccess()) {
+                            logger.info("SSL handshake completed 
successfully");
+                          } else {
+                            logger.info("SSL handshake failed: {}", 
future.cause().getMessage());
+                          }
+                        });
 
                 // 添加业务处理器
                 pipeline.addLast("handler", new NettyTransportHandler());
@@ -194,11 +188,11 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
     }
 
     try {
-      ByteBuf byteBuf = readQueue.take();
-      //      if (byteBuf == null) {
-      //        logger.info("No data available for ByteBuffer read 
(non-blocking)");
-      //        return 0; // 非阻塞读取,没有数据时返回 0
-      //      }
+      ByteBuf byteBuf = readQueue.poll();
+      if (byteBuf == null) {
+        logger.info("No data available for ByteBuffer read (non-blocking)");
+        return 0; // 非阻塞读取,没有数据时返回 0
+      }
 
       int available = Math.min(buffer.remaining(), byteBuf.readableBytes());
       if (available > 0) {
@@ -344,7 +338,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
                 connecting.set(false);
               });
       future.get();
-      return false; // 异步连接,立即返回 false
+      return true; // 异步连接,立即返回 false
     } catch (Exception e) {
       connecting.set(false);
       //      throw new IOException("Failed to start connection", e);

Reply via email to