This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch ssl_between_nodes
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1ce4494eccf814586bf0720e461b740b17ffb80c
Author: HTHou <[email protected]>
AuthorDate: Thu Jul 17 16:02:06 2025 +0800

    Grok4 is powerful
---
 .../iotdb/rpc/NettyTNonBlockingTransport.java      | 522 +++++++++------------
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   4 +-
 .../manager/load/service/HeartbeatService.java     |  20 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   2 +-
 4 files changed, 239 insertions(+), 309 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
index 7584ac68792..1f2456c4386 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
@@ -48,40 +48,51 @@ import javax.net.ssl.TrustManagerFactory;
 
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
 import java.security.KeyStore;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ * A non-blocking Thrift transport implementation using Netty for asynchronous 
I/O. Integrates with
+ * Thrift's TAsyncClientManager using a dummy local SocketChannel for selector 
events. Supports
+ * SSL/TLS for secure communication.
+ */
 public class NettyTNonBlockingTransport extends TNonblockingTransport {
 
   private static final Logger logger = 
LoggerFactory.getLogger(NettyTNonBlockingTransport.class);
 
   private final String host;
   private final int port;
-  private final int connectTimeoutMs = 60000;
+  private final int connectTimeoutMs;
+  private final long sslHandshakeTimeoutMs;
   private final EventLoopGroup eventLoopGroup;
   private final Bootstrap bootstrap;
-  private Channel channel;
+  private volatile Channel channel;
   private final AtomicBoolean connected = new AtomicBoolean(false);
   private final AtomicBoolean connecting = new AtomicBoolean(false);
   private final BlockingQueue<ByteBuf> readQueue = new LinkedBlockingQueue<>();
-  private final Object writeLock = new Object();
-  private NettySelectionKeyAdapter selectionKeyAdapter;
+  private final Object lock = new Object();
 
-  // SSL 配置
+  // SSL configuration
   private final String keystorePath;
   private final String keystorePassword;
   private final String truststorePath;
   private final String truststorePassword;
 
+  // Dummy local socket for selector integration
+  private ServerSocketChannel dummyServer;
+  private java.nio.channels.SocketChannel dummyClient;
+  private java.nio.channels.SocketChannel dummyServerAccepted;
+  private int dummyPort;
+  private Selector selector; // Stored for wakeup if needed
+
   public NettyTNonBlockingTransport(
       String host,
       int port,
@@ -93,15 +104,34 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
     super(new TConfiguration());
     this.host = host;
     this.port = port;
+    this.connectTimeoutMs = 60000;
+    this.sslHandshakeTimeoutMs = 30000;
     this.eventLoopGroup = new NioEventLoopGroup();
     this.bootstrap = new Bootstrap();
     this.keystorePath = keystorePath;
     this.keystorePassword = keystorePassword;
     this.truststorePath = truststorePath;
     this.truststorePassword = truststorePassword;
+    initDummyChannels();
     initBootstrap();
   }
 
+  /** Initializes dummy local channels for selector event simulation. */
+  private void initDummyChannels() throws TTransportException {
+    try {
+      dummyServer = ServerSocketChannel.open();
+      dummyServer.configureBlocking(false);
+      dummyServer.bind(new InetSocketAddress("localhost", 0));
+      dummyPort = dummyServer.socket().getLocalPort();
+      logger.debug("Dummy server bound to localhost:{}", dummyPort);
+
+      dummyClient = java.nio.channels.SocketChannel.open();
+      dummyClient.configureBlocking(false);
+    } catch (IOException e) {
+      throw new TTransportException("Failed to initialize dummy channels", e);
+    }
+  }
+
   private void initBootstrap() {
     bootstrap
         .group(eventLoopGroup)
@@ -113,28 +143,30 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
             new ChannelInitializer<SocketChannel>() {
               @Override
               protected void initChannel(SocketChannel ch) throws Exception {
-                logger.info("Initializing channel for {}:{}", host, port);
+                logger.debug("Initializing channel for {}:{}", host, port);
 
                 ChannelPipeline pipeline = ch.pipeline();
                 SslContext sslContext = createSslContext();
                 SslHandler sslHandler = sslContext.newHandler(ch.alloc(), 
host, port);
-                // 增加握手超时时间
-                sslHandler.setHandshakeTimeoutMillis(30000);
+                sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeoutMs);
 
                 pipeline.addLast("ssl", sslHandler);
-                // 添加SSL握手完成监听器
                 sslHandler
                     .handshakeFuture()
                     .addListener(
                         future -> {
                           if (future.isSuccess()) {
-                            logger.info("SSL handshake completed 
successfully");
+                            logger.info(
+                                "SSL handshake completed successfully for 
{}:{}", host, port);
                           } else {
-                            logger.info("SSL handshake failed: {}", 
future.cause().getMessage());
+                            logger.error(
+                                "SSL handshake failed for {}:{}: {}",
+                                host,
+                                port,
+                                future.cause().getMessage());
                           }
                         });
 
-                // 添加业务处理器
                 pipeline.addLast("handler", new NettyTransportHandler());
               }
             });
@@ -143,7 +175,6 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
   private SslContext createSslContext() throws Exception {
     SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
 
-    // 配置 KeyStore(客户端证书)
     if (keystorePath != null && keystorePassword != null) {
       KeyStore keyStore = KeyStore.getInstance("JKS");
       try (FileInputStream fis = new FileInputStream(keystorePath)) {
@@ -155,7 +186,6 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       sslContextBuilder.keyManager(kmf);
     }
 
-    // 配置 TrustStore(信任的服务器证书)
     if (truststorePath != null && truststorePassword != null) {
       KeyStore trustStore = KeyStore.getInstance("JKS");
       try (FileInputStream fis = new FileInputStream(truststorePath)) {
@@ -172,40 +202,43 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
 
   @Override
   public boolean isOpen() {
-    return channel != null && channel.isActive() && connected.get();
+    synchronized (lock) {
+      return connected.get() || (channel != null && channel.isActive());
+    }
   }
 
   @Override
   public void open() throws TTransportException {
-    throw new RuntimeException("open() is not implemented for 
NettyTNonblockingTransport");
+    throw new TTransportException(
+        TTransportException.NOT_OPEN, "open() is not implemented; use 
startConnect() instead");
   }
 
-  /** Perform a nonblocking read into buffer. */
+  @Override
   public int read(ByteBuffer buffer) throws TTransportException {
+    boolean readingResponseSize = buffer.remaining() == 4;
+
     if (!isOpen()) {
-      logger.info("Transport not open for ByteBuffer read");
+      logger.debug("Transport not open for ByteBuffer read");
       throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
     }
 
+    ByteBuf byteBuf = null;
     try {
-      ByteBuf byteBuf = readQueue.poll();
+      byteBuf = readQueue.poll();
       if (byteBuf == null) {
-        logger.info("No data available for ByteBuffer read (non-blocking)");
-        return 0; // 非阻塞读取,没有数据时返回 0
+        logger.info("No data available for ByteBuffer read");
+        return 0;
       }
 
       int available = Math.min(buffer.remaining(), byteBuf.readableBytes());
       if (available > 0) {
-        // 从 ByteBuf 读取数据到 ByteBuffer
         byte[] tempArray = new byte[available];
         byteBuf.readBytes(tempArray);
         buffer.put(tempArray);
-
         logger.info(
             "Read {} bytes into ByteBuffer, remaining space: {}", available, 
buffer.remaining());
       }
 
-      // 如果还有剩余数据,创建一个新的 ByteBuf 包含剩余数据
       if (byteBuf.readableBytes() > 0) {
         ByteBuf remaining = byteBuf.slice();
         remaining.retain();
@@ -213,41 +246,58 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
         logger.info("Put back {} remaining bytes", remaining.readableBytes());
       }
 
-      byteBuf.release();
-      return available;
+      // Drain dummy channel to clear OP_READ
+      ByteBuffer discard = ByteBuffer.allocate(16);
+      try {
+        while (dummyClient.read(discard) > 0) {
+          discard.clear();
+        }
+      } catch (IOException e) {
+        logger.warn("Failed to drain dummy channel", e);
+      }
+      if (readingResponseSize) {
+        // Trigger OP_READ on dummy by writing dummy byte
+        if (dummyServerAccepted != null) {
+          ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]);
+          dummyServerAccepted.write(dummyByte);
+        }
+        // Wakeup selector if needed
+        if (selector != null) {
+          selector.wakeup();
+        }
+      }
 
+      return available;
     } catch (Exception e) {
-      logger.warn("ByteBuffer read failed: ", e);
-      throw new TTransportException(TTransportException.UNKNOWN, e);
+      logger.error("ByteBuffer read failed: {}", e.getMessage());
+      throw new TTransportException(TTransportException.UNKNOWN, "Read 
failed", e);
+    } finally {
+      if (byteBuf != null) {
+        byteBuf.release();
+      }
     }
   }
 
-  /** Reads from the underlying input stream if not null. */
   @Override
   public int read(byte[] buf, int off, int len) throws TTransportException {
     if (!isOpen()) {
-      logger.info(
-          "Transport not open for read - channel: "
-              + (channel != null ? channel.isActive() : "null")
-              + ", connected: "
-              + connected.get());
+      logger.debug("Transport not open for read");
       throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
     }
 
     try {
-      // 使用 ByteBuffer 包装数组进行读取
       ByteBuffer buffer = ByteBuffer.wrap(buf, off, len);
       return read(buffer);
     } catch (Exception e) {
-      logger.warn("Read failed: ", e);
-      throw new TTransportException(TTransportException.UNKNOWN, e);
+      logger.error("Read failed: {}", e.getMessage());
+      throw new TTransportException(TTransportException.UNKNOWN, "Read 
failed", e);
     }
   }
 
-  /** Perform a nonblocking write of the data in buffer; */
+  @Override
   public int write(ByteBuffer buffer) throws TTransportException {
     if (!isOpen()) {
-      logger.info("Transport not open for ByteBuffer write");
+      logger.debug("Transport not open for ByteBuffer write");
       throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
     }
 
@@ -256,170 +306,184 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       return 0;
     }
 
-    logger.info("Writing " + remaining + " bytes from ByteBuffer");
-
-    synchronized (writeLock) {
-      // 创建 ByteBuf 从 ByteBuffer
-      ByteBuf byteBuf = Unpooled.buffer();
-      byteBuf.writeBytes(buffer);
-      ChannelFuture future = channel.writeAndFlush(byteBuf);
+    logger.info("Writing {} bytes from ByteBuffer", remaining);
 
-      final int bytesToWrite = remaining;
-      future.addListener(
-          (GenericFutureListener<ChannelFuture>)
-              future1 -> {
-                if (future1.isSuccess()) {
-                  logger.info(
-                      "ByteBuffer write completed successfully: " + 
bytesToWrite + " bytes");
-                } else {
-                  logger.warn("ByteBuffer write failed: " + 
future1.cause().getMessage());
-                  future1.cause().printStackTrace();
-                }
-              });
+    synchronized (lock) {
+      ByteBuf byteBuf = Unpooled.buffer(remaining);
+      try {
+        byteBuf.writeBytes(buffer);
+        ChannelFuture future = channel.writeAndFlush(byteBuf);
+        future.addListener(
+            (GenericFutureListener<ChannelFuture>)
+                future1 -> {
+                  if (future1.isSuccess()) {
+                    logger.debug("ByteBuffer write completed successfully: {} 
bytes", remaining);
+                  } else {
+                    logger.error("ByteBuffer write failed: {}", 
future1.cause().getMessage());
+                  }
+                });
+        return remaining;
+      } catch (Exception e) {
+        byteBuf.release();
+        logger.error("ByteBuffer write failed: {}", e.getMessage());
+        throw new TTransportException(TTransportException.UNKNOWN, "Write 
failed", e);
+      }
     }
-
-    // 对于非阻塞写入,我们假设所有数据都能写入
-    // 实际的写入状态通过 Future 监听器处理
-    return remaining;
   }
 
-  /** Writes to the underlying output stream if not null. */
   @Override
   public void write(byte[] buf, int off, int len) throws TTransportException {
     if (!isOpen()) {
-      logger.info("Transport not open for write");
+      logger.debug("Transport not open for write");
       throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
     }
 
-    // 使用 ByteBuffer 包装数组进行写入
     ByteBuffer buffer = ByteBuffer.wrap(buf, off, len);
     write(buffer);
   }
 
   @Override
   public void flush() throws TTransportException {
-    // Not supported by SocketChannel.
+    if (!isOpen()) {
+      logger.debug("Transport not open for flush");
+      throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
+    }
+    synchronized (lock) {
+      channel.flush();
+      logger.debug("Flushed channel");
+    }
   }
 
   @Override
   public void close() {
-    connected.set(false);
-    if (channel != null) {
-      channel.close();
+    synchronized (lock) {
+      connected.set(false);
+      if (channel != null) {
+        channel.close();
+        channel = null;
+        logger.info("Channel closed for {}:{}", host, port);
+      }
+      try {
+        if (dummyClient != null) {
+          dummyClient.close();
+        }
+        if (dummyServerAccepted != null) {
+          dummyServerAccepted.close();
+        }
+        if (dummyServer != null) {
+          dummyServer.close();
+        }
+      } catch (IOException e) {
+        logger.warn("Failed to close dummy channels", e);
+      }
+      eventLoopGroup.shutdownGracefully();
+      logger.info("EventLoopGroup shutdown initiated");
     }
-    eventLoopGroup.shutdownGracefully();
   }
 
   @Override
   public boolean startConnect() throws IOException {
     if (connected.get() || connecting.get()) {
-      logger.info("Starting connection return " + (connected.get() || 
connecting.get()));
+      logger.debug("Connection already started or established for {}:{}", 
host, port);
       return connected.get();
     }
 
     if (!connecting.compareAndSet(false, true)) {
+      logger.debug("Concurrent connection attempt detected for {}:{}", host, 
port);
       return false;
     }
-    logger.info("Starting connection to " + host + ":" + port);
+
+    logger.info("Starting connection to {}:{}", host, port);
 
     try {
+      // Initiate dummy connect, it will pend until accept
+      dummyClient.connect(new InetSocketAddress("localhost", dummyPort));
+
+      // Initiate Netty connect
       ChannelFuture future = bootstrap.connect(host, port);
       future.addListener(
           (GenericFutureListener<ChannelFuture>)
               future1 -> {
-                if (future1.isSuccess()) {
-                  logger.info("Connection established successfully");
-                  channel = future1.channel();
-                  connected.set(true);
-                  if (selectionKeyAdapter != null) {
-                    selectionKeyAdapter.setConnected(true);
+                synchronized (lock) {
+                  if (future1.isSuccess()) {
+                    logger.info("Connection established successfully to 
{}:{}", host, port);
+                    channel = future1.channel();
+                    connected.set(true);
+                    // Now accept the dummy connection to complete it
+                    try {
+                      dummyServerAccepted = dummyServer.accept();
+                      if (dummyServerAccepted != null) {
+                        dummyServerAccepted.configureBlocking(false);
+                        logger.debug("Dummy server accepted connection");
+                        // Wakeup selector to detect OP_CONNECT
+                        if (selector != null) {
+                          selector.wakeup();
+                        }
+                      }
+                    } catch (IOException e) {
+                      logger.warn("Failed to accept dummy connection", e);
+                    }
+                  } else {
+                    logger.error(
+                        "Connection failed to {}:{}: {}", host, port, 
future1.cause().getMessage());
                   }
+                  connecting.set(false);
                 }
-                connecting.set(false);
               });
       future.get();
-      return true; // 异步连接,立即返回 false
+      return false; // Return false to indicate pending connect for dummy
     } catch (Exception e) {
       connecting.set(false);
-      //      throw new IOException("Failed to start connection", e);
-      return false;
+      logger.error("Failed to start connection to {}:{}", host, port, 
e.getMessage());
+      throw new IOException("Failed to start connection", e);
     }
   }
 
   @Override
   public boolean finishConnect() throws IOException {
-    return connected.get();
+    synchronized (lock) {
+      boolean dummyFinished = dummyClient.finishConnect();
+      boolean isConnected = connected.get() && dummyFinished;
+      logger.debug(
+          "finishConnect called, netty connected: {}, dummy finished: {}",
+          connected.get(),
+          dummyFinished);
+      return isConnected;
+    }
   }
 
   @Override
   public SelectionKey registerSelector(Selector selector, int interests) 
throws IOException {
-    if (selectionKeyAdapter == null) {
-      selectionKeyAdapter = new NettySelectionKeyAdapter(this, selector, 
interests);
-
-      // 尝试通过反射获取 selectedKeys 的可修改引用
-      try {
-        // 尝试不同的字段名,因为不同的 Selector 实现可能使用不同的字段名
-        String[] possibleFieldNames = {"selectedKeys", "publicSelectedKeys", 
"keys"};
-        Field selectedKeysField = null;
-
-        for (String fieldName : possibleFieldNames) {
-          try {
-            selectedKeysField = 
selector.getClass().getSuperclass().getDeclaredField(fieldName);
-            break;
-          } catch (NoSuchFieldException e) {
-            // 继续尝试下一个字段名
-          }
-        }
-
-        if (selectedKeysField != null) {
-          selectedKeysField.setAccessible(true);
-          Object selectedKeysObj = selectedKeysField.get(selector);
-
-          if (selectedKeysObj instanceof Set) {
-            @SuppressWarnings("unchecked")
-            Set<SelectionKey> selectedKeys = (Set<SelectionKey>) 
selectedKeysObj;
-            selectionKeyAdapter.setSelectedKeysReference(selectedKeys);
-            logger.info("Successfully obtained selectedKeys reference via 
reflection");
-            try {
-              
selectionKeyAdapter.selectedKeysReference.add(selectionKeyAdapter);
-              selector.wakeup();
-            } catch (Exception e) {
-              logger.warn("Failed to add to selectedKeys: " + e.getMessage());
-            }
-          }
-        }
-      } catch (Exception e) {
-        logger.warn("Failed to access selectedKeys via reflection: " + 
e.getMessage());
-        // 继续执行,使用备用方案
-      }
-
-    } else {
-      selectionKeyAdapter.interestOps(interests);
+    synchronized (lock) {
+      this.selector = selector;
+      return dummyClient.register(selector, interests);
     }
-
-    return selectionKeyAdapter;
   }
 
   @Override
   public String toString() {
-    return "[remote: " + getRemoteAddress() + ", local: " + getLocalAddress() 
+ "]";
+    synchronized (lock) {
+      return "[remote: " + getRemoteAddress() + ", local: " + 
getLocalAddress() + "]";
+    }
+  }
+
+  public SocketAddress getRemoteAddress() {
+    synchronized (lock) {
+      return channel != null ? channel.remoteAddress() : null;
+    }
+  }
+
+  public SocketAddress getLocalAddress() {
+    synchronized (lock) {
+      return channel != null ? channel.localAddress() : null;
+    }
   }
 
-  // Netty 处理器
   private class NettyTransportHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-      logger.info("Channel active: " + ctx.channel().remoteAddress());
-
-      connected.set(true);
-
-      // 更新 SelectionKey 状态
-      if (selectionKeyAdapter != null) {
-        selectionKeyAdapter.setConnected(true);
-        selectionKeyAdapter.setReadReady(true);
-      }
-
+      logger.info("Channel active: {}", ctx.channel().remoteAddress());
       super.channelActive(ctx);
     }
 
@@ -427,172 +491,40 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
       if (msg instanceof ByteBuf) {
         ByteBuf byteBuf = (ByteBuf) msg;
-        logger.info("Received " + byteBuf.readableBytes() + " bytes");
-
-        // 保留引用计数,将数据放入读取队列
-        readQueue.offer(byteBuf.retain());
-        byteBuf.release(); // 释放原始引用
-
-        // 通知选择器适配器有数据可读
-        if (selectionKeyAdapter != null) {
-          selectionKeyAdapter.setReadReady(true);
+        logger.info("Received {} bytes", byteBuf.readableBytes());
+
+        synchronized (lock) {
+          readQueue.offer(byteBuf.retain());
+          // Trigger OP_READ on dummy by writing dummy byte
+          if (dummyServerAccepted != null) {
+            ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]);
+            dummyServerAccepted.write(dummyByte);
+          }
+          // Wakeup selector if needed
+          if (selector != null) {
+            selector.wakeup();
+          }
         }
+        byteBuf.release();
       }
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-      logger.info("Channel inactive");
-
-      connected.set(false);
-      connecting.set(false);
-
-      // 更新 SelectionKey 状态
-      if (selectionKeyAdapter != null) {
-        selectionKeyAdapter.setConnected(false);
-        selectionKeyAdapter.setReadReady(false);
+      logger.info("Channel inactive: {}", ctx.channel().remoteAddress());
+      synchronized (lock) {
+        connected.set(false);
+        connecting.set(false);
       }
-
       super.channelInactive(ctx);
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-      logger.warn("Channel exception: {}", cause.getMessage());
-
-      // 更新 SelectionKey 状态
-      if (selectionKeyAdapter != null) {
-        selectionKeyAdapter.setConnected(false);
-        selectionKeyAdapter.setReadReady(false);
-        selectionKeyAdapter.cancel(); // 取消 SelectionKey
-      }
-
-      ctx.close();
-    }
-  }
-
-  // SelectionKey 适配器类
-  private static class NettySelectionKeyAdapter extends SelectionKey {
-    private final NettyTNonBlockingTransport transport;
-    private Selector selector;
-    private int interestOps;
-    private int readyOps = 0;
-    private volatile boolean connected = false;
-    private volatile boolean readReady = false;
-    private volatile boolean valid = true;
-    private Set<SelectionKey> selectedKeysReference;
-
-    public NettySelectionKeyAdapter(
-        NettyTNonBlockingTransport transport, Selector selector, int ops) {
-      this.transport = transport;
-      this.selector = selector;
-      this.interestOps = ops;
-    }
-
-    public void setSelectedKeysReference(Set<SelectionKey> selectedKeys) {
-      this.selectedKeysReference = selectedKeys;
-    }
-
-    @Override
-    public SelectableChannel channel() {
-      return null; // Netty 管理通道
-    }
-
-    @Override
-    public Selector selector() {
-      return selector;
-    }
-
-    @Override
-    public void cancel() {
-      this.valid = false;
-      if (selectedKeysReference != null) {
-        selectedKeysReference.remove(this);
-      }
-      logger.info("SelectionKey cancelled");
-    }
-
-    @Override
-    public boolean isValid() {
-      return valid;
-    }
-
-    @Override
-    public SelectionKey interestOps(int ops) {
-      this.interestOps = ops;
-      logger.info("SelectionKey interestOps set to: " + ops);
-      updateSelectorIfReady();
-      return this;
-    }
-
-    @Override
-    public int interestOps() {
-      return interestOps;
-    }
-
-    @Override
-    public int readyOps() {
-      int ops = 0;
-
-      // 检查连接状态
-      if (connected && (interestOps & OP_CONNECT) != 0) {
-        ops |= OP_CONNECT;
-      }
-
-      // 检查读取状态
-      if (readReady && (interestOps & OP_READ) != 0) {
-        ops |= OP_READ;
-      }
-
-      // 写入通常总是就绪的(如果连接是活跃的)
-      if ((interestOps & OP_WRITE) != 0 && transport.isOpen()) {
-        ops |= OP_WRITE;
-      }
-
-      if (ops != readyOps) {
-        logger.info("SelectionKey readyOps changed: " + readyOps + " -> " + 
ops);
-      }
-
-      readyOps = ops;
-      return readyOps;
-    }
-
-    public void setConnected(boolean connected) {
-      this.connected = connected;
-      updateSelectorIfReady();
-    }
-
-    public void setReadReady(boolean readReady) {
-      if (this.readReady != readReady) {
-        logger.info("ReadReady changed: {} -> {}", this.readReady, readReady);
+      logger.error("Channel exception: {}", cause.getMessage());
+      synchronized (lock) {
+        ctx.close();
       }
-      this.readReady = readReady;
-      updateSelectorIfReady();
     }
-
-    private void updateSelectorIfReady() {
-      if (selector != null && isValid() && selectedKeysReference != null) {
-        int ready = readyOps();
-        if (ready != 0) {
-          try {
-            selectedKeysReference.add(this);
-            selector.wakeup();
-            logger.info("Added self to selectedKeys via reference, readyOps: " 
+ ready);
-          } catch (Exception e) {
-            logger.warn("Failed to add to selectedKeys: " + e.getMessage());
-          }
-        }
-      }
-    }
-  }
-
-  // 辅助方法:获取远程地址
-  public SocketAddress getRemoteAddress() {
-    return channel != null ? channel.remoteAddress() : null;
-  }
-
-  // 辅助方法:获取本地地址
-  public SocketAddress getLocalAddress() {
-    return channel != null ? channel.localAddress() : null;
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index a8d5c330c5b..50c8f5bc9a0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -61,10 +61,10 @@ public class ConfigNodeConfig {
   private int configRegionId = 0;
 
   /** ConfigNodeGroup consensus protocol. */
-  private String configNodeConsensusProtocolClass = 
ConsensusFactory.SIMPLE_CONSENSUS;
+  private String configNodeConsensusProtocolClass = 
ConsensusFactory.RATIS_CONSENSUS;
 
   /** Schema region consensus protocol. */
-  private String schemaRegionConsensusProtocolClass = 
ConsensusFactory.SIMPLE_CONSENSUS;
+  private String schemaRegionConsensusProtocolClass = 
ConsensusFactory.RATIS_CONSENSUS;
 
   /** Default number of SchemaRegion replicas. */
   private int schemaReplicationFactor = 1;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index e079fac021f..59545dbec0a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool;
 import 
org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
@@ -35,7 +34,6 @@ import 
org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool
 import 
org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
-import org.apache.iotdb.confignode.client.sync.SyncDataNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
@@ -278,15 +276,15 @@ public class HeartbeatService {
               configManager.getPipeManager().getPipeRuntimeCoordinator());
       configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
       addConfigNodeLocationsToReq(dataNodeId, heartbeatReq);
-//      if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) {
-//        SyncDataNodeHeartbeatClientPool.getInstance()
-//            .getDataNodeHeartBeat(
-//                dataNodeInfo.getLocation().getInternalEndPoint(), 
heartbeatReq, handler);
-//      } else {
-        AsyncDataNodeHeartbeatClientPool.getInstance()
-            .getDataNodeHeartBeat(
-                dataNodeInfo.getLocation().getInternalEndPoint(), 
heartbeatReq, handler);
-//      }
+      //      if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) {
+      //        SyncDataNodeHeartbeatClientPool.getInstance()
+      //            .getDataNodeHeartBeat(
+      //                dataNodeInfo.getLocation().getInternalEndPoint(), 
heartbeatReq, handler);
+      //      } else {
+      AsyncDataNodeHeartbeatClientPool.getInstance()
+          .getDataNodeHeartBeat(
+              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, 
handler);
+      //      }
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 783769edaa4..852e55a65d3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -406,7 +406,7 @@ public class CommonConfig {
   private volatile Pattern trustedUriPattern = Pattern.compile("file:.*");
 
   /** Enable the thrift Service ssl. */
-  private boolean enableSSL = false;
+  private boolean enableSSL = true;
 
   /** ssl key Store Path. */
   private String keyStorePath = "/Users/ht/.keystore";

Reply via email to