This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch ssl_between_nodes
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4d4284c19eb3a4f80eaf3251f30442edd2f19542
Author: HTHou <[email protected]>
AuthorDate: Tue Jun 24 15:14:31 2025 +0800

    deving
---
 iotdb-client/service-rpc/pom.xml                   |   8 +
 .../iotdb/rpc/NettyTNonBlockingTransport.java      | 607 +++++++++++++++++++++
 .../iotdb/rpc/TNonblockingSocketWrapper.java       |  17 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   4 +-
 .../iotdb/confignode/manager/load/LoadManager.java |   2 +-
 .../manager/load/service/HeartbeatService.java     |  18 +-
 .../manager/load/service/TopologyService.java      |   9 +-
 .../iot/client/AsyncIoTConsensusServiceClient.java |  15 +-
 .../AsyncConfigNodeInternalServiceClient.java      |  15 +-
 .../async/AsyncDataNodeExternalServiceClient.java  |  15 +-
 .../async/AsyncDataNodeInternalServiceClient.java  |  15 +-
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |  15 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  12 +-
 13 files changed, 717 insertions(+), 35 deletions(-)

diff --git a/iotdb-client/service-rpc/pom.xml b/iotdb-client/service-rpc/pom.xml
index d7efd73817c..f4b3a7e2e90 100644
--- a/iotdb-client/service-rpc/pom.xml
+++ b/iotdb-client/service-rpc/pom.xml
@@ -84,6 +84,14 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
new file mode 100644
index 00000000000..63f4611d325
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.security.KeyStore;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class NettyTNonBlockingTransport extends TNonblockingTransport {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(NettyTNonBlockingTransport.class);
+
+  private final String host;
+  private final int port;
+  private final int connectTimeoutMs = 60000;
+  private final EventLoopGroup eventLoopGroup;
+  private final Bootstrap bootstrap;
+  private Channel channel;
+  private final AtomicBoolean connected = new AtomicBoolean(false);
+  private final AtomicBoolean connecting = new AtomicBoolean(false);
+  private final BlockingQueue<ByteBuf> readQueue = new LinkedBlockingQueue<>();
+  private final Object writeLock = new Object();
+  private NettySelectionKeyAdapter selectionKeyAdapter;
+
+  // SSL 配置
+  private final String keystorePath;
+  private final String keystorePassword;
+  private final String truststorePath;
+  private final String truststorePassword;
+  private boolean sslEnabled = false;
+
+  public NettyTNonBlockingTransport(
+      String host,
+      int port,
+      String keystorePath,
+      String keystorePassword,
+      String truststorePath,
+      String truststorePassword)
+      throws TTransportException {
+    super(new TConfiguration());
+    this.host = host;
+    this.port = port;
+    this.eventLoopGroup = new NioEventLoopGroup();
+    this.bootstrap = new Bootstrap();
+    this.keystorePath = keystorePath;
+    this.keystorePassword = keystorePassword;
+    this.truststorePath = truststorePath;
+    this.truststorePassword = truststorePassword;
+    this.sslEnabled = true;
+    initBootstrap();
+  }
+
+  private void initBootstrap() {
+    bootstrap
+        .group(eventLoopGroup)
+        .channel(NioSocketChannel.class)
+        .option(ChannelOption.TCP_NODELAY, true)
+        .option(ChannelOption.SO_KEEPALIVE, true)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
+        .handler(
+            new ChannelInitializer<SocketChannel>() {
+              @Override
+              protected void initChannel(SocketChannel ch) throws Exception {
+                logger.info("Initializing channel for {}:{}", host, port);
+
+                ChannelPipeline pipeline = ch.pipeline();
+
+                // 添加 SSL 处理器(如果启用)
+                if (sslEnabled) {
+                  SslContext sslContext = createSslContext();
+                  SslHandler sslHandler = sslContext.newHandler(ch.alloc(), 
host, port);
+                  // 增加握手超时时间
+                  sslHandler.setHandshakeTimeoutMillis(30000);
+
+                  pipeline.addLast("ssl", sslHandler);
+                  // 添加SSL握手完成监听器
+                  sslHandler
+                      .handshakeFuture()
+                      .addListener(
+                          future -> {
+                            if (future.isSuccess()) {
+                              logger.info("SSL handshake completed 
successfully");
+                            } else {
+                              logger.info("SSL handshake failed: ", 
future.cause());
+                            }
+                          });
+                }
+
+                // 添加业务处理器
+                pipeline.addLast("handler", new NettyTransportHandler());
+              }
+            });
+  }
+
+  private SslContext createSslContext() throws Exception {
+    SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+
+    // 配置 KeyStore(客户端证书)
+    if (keystorePath != null && keystorePassword != null) {
+      KeyStore keyStore = KeyStore.getInstance("JKS");
+      try (FileInputStream fis = new FileInputStream(keystorePath)) {
+        keyStore.load(fis, keystorePassword.toCharArray());
+      }
+      KeyManagerFactory kmf =
+          
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+      kmf.init(keyStore, keystorePassword.toCharArray());
+      sslContextBuilder.keyManager(kmf);
+    }
+
+    // 配置 TrustStore(信任的服务器证书)
+    if (truststorePath != null && truststorePassword != null) {
+      KeyStore trustStore = KeyStore.getInstance("JKS");
+      try (FileInputStream fis = new FileInputStream(truststorePath)) {
+        trustStore.load(fis, truststorePassword.toCharArray());
+      }
+      TrustManagerFactory tmf =
+          
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+      tmf.init(trustStore);
+      sslContextBuilder.trustManager(tmf);
+    }
+
+    return sslContextBuilder.build();
+  }
+
+  @Override
+  public boolean isOpen() {
+    return channel != null && channel.isActive() && connected.get();
+  }
+
+  @Override
+  public void open() throws TTransportException {
+    throw new RuntimeException("open() is not implemented for 
NettyTNonblockingTransport");
+  }
+
+  /** Perform a nonblocking read into buffer. */
+  public int read(ByteBuffer buffer) throws TTransportException {
+    if (!isOpen()) {
+      logger.info("Transport not open for ByteBuffer read");
+      throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
+    }
+
+    try {
+      ByteBuf byteBuf = readQueue.take();
+//      if (byteBuf == null) {
+//        logger.info("No data available for ByteBuffer read (non-blocking)");
+//        return 0; // 非阻塞读取,没有数据时返回 0
+//      }
+
+      int available = Math.min(buffer.remaining(), byteBuf.readableBytes());
+      if (available > 0) {
+        // 从 ByteBuf 读取数据到 ByteBuffer
+        byte[] tempArray = new byte[available];
+        byteBuf.readBytes(tempArray);
+        buffer.put(tempArray);
+
+        logger.info(
+            "Read {} bytes into ByteBuffer, remaining space: {}", available, 
buffer.remaining());
+      }
+
+      // 如果还有剩余数据,创建一个新的 ByteBuf 包含剩余数据
+      if (byteBuf.readableBytes() > 0) {
+        ByteBuf remaining = byteBuf.slice();
+        remaining.retain();
+        readQueue.offer(remaining);
+        logger.info("Put back {} remaining bytes", remaining.readableBytes());
+      }
+
+      byteBuf.release();
+      return available;
+
+    } catch (Exception e) {
+      logger.warn("ByteBuffer read failed: ", e);
+      throw new TTransportException(TTransportException.UNKNOWN, e);
+    }
+  }
+
+  /** Reads from the underlying input stream if not null. */
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (!isOpen()) {
+      logger.info(
+          "Transport not open for read - channel: "
+              + (channel != null ? channel.isActive() : "null")
+              + ", connected: "
+              + connected.get());
+      throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
+    }
+
+    try {
+      // 使用 ByteBuffer 包装数组进行读取
+      ByteBuffer buffer = ByteBuffer.wrap(buf, off, len);
+      return read(buffer);
+    } catch (Exception e) {
+      logger.warn("Read failed: ", e);
+      throw new TTransportException(TTransportException.UNKNOWN, e);
+    }
+  }
+
+  /** Perform a nonblocking write of the data in buffer; */
+  public int write(ByteBuffer buffer) throws TTransportException {
+    if (!isOpen()) {
+      logger.info("Transport not open for ByteBuffer write");
+      throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
+    }
+
+    int remaining = buffer.remaining();
+    if (remaining == 0) {
+      return 0;
+    }
+
+    logger.info("Writing " + remaining + " bytes from ByteBuffer");
+
+    synchronized (writeLock) {
+      // 创建 ByteBuf 从 ByteBuffer
+      ByteBuf byteBuf = Unpooled.buffer();
+      byteBuf.writeBytes(buffer);
+      ChannelFuture future = channel.writeAndFlush(byteBuf);
+
+      final int bytesToWrite = remaining;
+      future.addListener(
+          (GenericFutureListener<ChannelFuture>)
+              future1 -> {
+                if (future1.isSuccess()) {
+                  logger.info(
+                      "ByteBuffer write completed successfully: " + 
bytesToWrite + " bytes");
+                } else {
+                  logger.warn("ByteBuffer write failed: " + 
future1.cause().getMessage());
+                  future1.cause().printStackTrace();
+                }
+              });
+    }
+
+    // 对于非阻塞写入,我们假设所有数据都能写入
+    // 实际的写入状态通过 Future 监听器处理
+    return remaining;
+  }
+
+  /** Writes to the underlying output stream if not null. */
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if (!isOpen()) {
+      logger.info("Transport not open for write");
+      throw new TTransportException(TTransportException.NOT_OPEN, "Transport 
not open");
+    }
+
+    // 使用 ByteBuffer 包装数组进行写入
+    ByteBuffer buffer = ByteBuffer.wrap(buf, off, len);
+    write(buffer);
+  }
+
+  @Override
+  public void flush() throws TTransportException {
+    // Not supported by SocketChannel.
+  }
+
+  @Override
+  public void close() {
+    connected.set(false);
+    if (channel != null) {
+      channel.close();
+    }
+    eventLoopGroup.shutdownGracefully();
+  }
+
+  @Override
+  public boolean startConnect() throws IOException {
+    if (connected.get() || connecting.get()) {
+      logger.info("Starting connection return " + (connected.get() || 
connecting.get()));
+      return connected.get();
+    }
+
+    if (!connecting.compareAndSet(false, true)) {
+      return false;
+    }
+    logger.info("Starting connection to " + host + ":" + port);
+
+    try {
+      ChannelFuture future = bootstrap.connect(host, port);
+      future.addListener(
+          (GenericFutureListener<ChannelFuture>)
+              future1 -> {
+                if (future1.isSuccess()) {
+                  logger.info("Connection established successfully");
+                  channel = future1.channel();
+                  connected.set(true);
+                  if (selectionKeyAdapter != null) {
+                    selectionKeyAdapter.setConnected(true);
+                  }
+                }
+                connecting.set(false);
+              });
+      future.get();
+      return false; // 异步连接,立即返回 false
+    } catch (Exception e) {
+      connecting.set(false);
+//      throw new IOException("Failed to start connection", e);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean finishConnect() throws IOException {
+    return connected.get();
+  }
+
+  @Override
+  public SelectionKey registerSelector(Selector selector, int interests) 
throws IOException {
+    if (selectionKeyAdapter == null) {
+      selectionKeyAdapter = new NettySelectionKeyAdapter(this, selector, 
interests);
+
+      // 尝试通过反射获取 selectedKeys 的可修改引用
+      try {
+        // 尝试不同的字段名,因为不同的 Selector 实现可能使用不同的字段名
+        String[] possibleFieldNames = {"selectedKeys", "publicSelectedKeys", 
"keys"};
+        Field selectedKeysField = null;
+
+        for (String fieldName : possibleFieldNames) {
+          try {
+            selectedKeysField = 
selector.getClass().getSuperclass().getDeclaredField(fieldName);
+            break;
+          } catch (NoSuchFieldException e) {
+            // 继续尝试下一个字段名
+          }
+        }
+
+        if (selectedKeysField != null) {
+          selectedKeysField.setAccessible(true);
+          Object selectedKeysObj = selectedKeysField.get(selector);
+
+          if (selectedKeysObj instanceof Set) {
+            @SuppressWarnings("unchecked")
+            Set<SelectionKey> selectedKeys = (Set<SelectionKey>) 
selectedKeysObj;
+            selectionKeyAdapter.setSelectedKeysReference(selectedKeys);
+            logger.info("Successfully obtained selectedKeys reference via 
reflection");
+            try {
+              
selectionKeyAdapter.selectedKeysReference.add(selectionKeyAdapter);
+              selector.wakeup();
+            } catch (Exception e) {
+              logger.warn("Failed to add to selectedKeys: " + e.getMessage());
+            }
+          }
+        }
+      } catch (Exception e) {
+        logger.warn("Failed to access selectedKeys via reflection: " + 
e.getMessage());
+        // 继续执行,使用备用方案
+      }
+
+    } else {
+      selectionKeyAdapter.interestOps(interests);
+    }
+
+    return selectionKeyAdapter;
+  }
+
+  @Override
+  public String toString() {
+    return "[remote: " + getRemoteAddress() + ", local: " + getLocalAddress() 
+ "]";
+  }
+
+  // Netty 处理器
+  private class NettyTransportHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+      logger.info("Channel active: " + ctx.channel().remoteAddress());
+
+      connected.set(true);
+
+      // 更新 SelectionKey 状态
+      if (selectionKeyAdapter != null) {
+        selectionKeyAdapter.setConnected(true);
+        selectionKeyAdapter.setReadReady(true);
+      }
+
+      super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+      if (msg instanceof ByteBuf) {
+        ByteBuf byteBuf = (ByteBuf) msg;
+        logger.info("Received " + byteBuf.readableBytes() + " bytes");
+
+        // 保留引用计数,将数据放入读取队列
+        readQueue.offer(byteBuf.retain());
+        byteBuf.release(); // 释放原始引用
+
+        // 通知选择器适配器有数据可读
+        if (selectionKeyAdapter != null) {
+          selectionKeyAdapter.setReadReady(true);
+        }
+      }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      logger.info("Channel inactive");
+
+      connected.set(false);
+      connecting.set(false);
+
+      // 更新 SelectionKey 状态
+      if (selectionKeyAdapter != null) {
+        selectionKeyAdapter.setConnected(false);
+        selectionKeyAdapter.setReadReady(false);
+      }
+
+      super.channelInactive(ctx);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+      logger.warn("Channel exception: {}", cause.getMessage());
+
+      // 更新 SelectionKey 状态
+      if (selectionKeyAdapter != null) {
+        selectionKeyAdapter.setConnected(false);
+        selectionKeyAdapter.setReadReady(false);
+        selectionKeyAdapter.cancel(); // 取消 SelectionKey
+      }
+
+      ctx.close();
+    }
+  }
+
+  // SelectionKey 适配器类
+  private static class NettySelectionKeyAdapter extends SelectionKey {
+    private final NettyTNonBlockingTransport transport;
+    private Selector selector;
+    private int interestOps;
+    private int readyOps = 0;
+    private volatile boolean connected = false;
+    private volatile boolean readReady = false;
+    private volatile boolean valid = true;
+    private Set<SelectionKey> selectedKeysReference;
+
+    public NettySelectionKeyAdapter(
+        NettyTNonBlockingTransport transport, Selector selector, int ops) {
+      this.transport = transport;
+      this.selector = selector;
+      this.interestOps = ops;
+    }
+
+    public void setSelectedKeysReference(Set<SelectionKey> selectedKeys) {
+      this.selectedKeysReference = selectedKeys;
+    }
+
+    @Override
+    public SelectableChannel channel() {
+      return null; // Netty 管理通道
+    }
+
+    @Override
+    public Selector selector() {
+      return selector;
+    }
+
+    @Override
+    public void cancel() {
+      this.valid = false;
+      if (selectedKeysReference != null) {
+        selectedKeysReference.remove(this);
+      }
+      logger.info("SelectionKey cancelled");
+    }
+
+    @Override
+    public boolean isValid() {
+      return valid;
+    }
+
+    @Override
+    public SelectionKey interestOps(int ops) {
+      this.interestOps = ops;
+      logger.info("SelectionKey interestOps set to: " + ops);
+      updateSelectorIfReady();
+      return this;
+    }
+
+    @Override
+    public int interestOps() {
+      return interestOps;
+    }
+
+    @Override
+    public int readyOps() {
+      int ops = 0;
+
+      // 检查连接状态
+      if (connected && (interestOps & OP_CONNECT) != 0) {
+        ops |= OP_CONNECT;
+      }
+
+      // 检查读取状态
+      if (readReady && (interestOps & OP_READ) != 0) {
+        ops |= OP_READ;
+      }
+
+      // 写入通常总是就绪的(如果连接是活跃的)
+      if ((interestOps & OP_WRITE) != 0 && transport.isOpen()) {
+        ops |= OP_WRITE;
+      }
+
+      if (ops != readyOps) {
+        logger.info("SelectionKey readyOps changed: " + readyOps + " -> " + 
ops);
+      }
+
+      readyOps = ops;
+      return readyOps;
+    }
+
+    public void setConnected(boolean connected) {
+      this.connected = connected;
+      updateSelectorIfReady();
+    }
+
+    public void setReadReady(boolean readReady) {
+      if (this.readReady != readReady) {
+        logger.info("ReadReady changed: {} -> {}", this.readReady, readReady);
+      }
+      this.readReady = readReady;
+      updateSelectorIfReady();
+    }
+
+    private void updateSelectorIfReady() {
+      if (selector != null && isValid() && selectedKeysReference != null) {
+        int ready = readyOps();
+        if (ready != 0) {
+          try {
+            selectedKeysReference.add(this);
+            selector.wakeup();
+            logger.info("Added self to selectedKeys via reference, readyOps: " 
+ ready);
+          } catch (Exception e) {
+            logger.warn("Failed to add to selectedKeys: " + e.getMessage());
+          }
+        }
+      }
+    }
+  }
+
+  // 辅助方法:获取远程地址
+  public SocketAddress getRemoteAddress() {
+    return channel != null ? channel.remoteAddress() : null;
+  }
+
+  // 辅助方法:获取本地地址
+  public SocketAddress getLocalAddress() {
+    return channel != null ? channel.localAddress() : null;
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
index 5f2b6cde2f4..9370d4a4373 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.rpc;
 
 import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
 import org.apache.thrift.transport.TTransportException;
 
 import java.io.IOException;
@@ -58,5 +59,21 @@ public class TNonblockingSocketWrapper {
     }
   }
 
+  public static TNonblockingTransport wrap(
+      String host,
+      int port,
+      String keyStorePath,
+      String keyStorePwd,
+      String trustStorePath,
+      String trustStorePwd) {
+    try {
+      return new NettyTNonBlockingTransport(
+          host, port, keyStorePath, keyStorePwd, trustStorePath, 
trustStorePwd);
+    } catch (TTransportException e) {
+      // never happen
+      return null;
+    }
+  }
+
   private TNonblockingSocketWrapper() {}
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 50c8f5bc9a0..a8d5c330c5b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -61,10 +61,10 @@ public class ConfigNodeConfig {
   private int configRegionId = 0;
 
   /** ConfigNodeGroup consensus protocol. */
-  private String configNodeConsensusProtocolClass = 
ConsensusFactory.RATIS_CONSENSUS;
+  private String configNodeConsensusProtocolClass = 
ConsensusFactory.SIMPLE_CONSENSUS;
 
   /** Schema region consensus protocol. */
-  private String schemaRegionConsensusProtocolClass = 
ConsensusFactory.RATIS_CONSENSUS;
+  private String schemaRegionConsensusProtocolClass = 
ConsensusFactory.SIMPLE_CONSENSUS;
 
   /** Default number of SchemaRegion replicas. */
   private int schemaReplicationFactor = 1;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 54dd582551d..c1651039ec5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -152,7 +152,7 @@ public class LoadManager {
     statisticsService.startLoadStatisticsService();
     eventService.startEventService();
     partitionBalancer.setupPartitionBalancer();
-    topologyService.startTopologyService();
+//    topologyService.startTopologyService();
   }
 
   public void stopLoadServices() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 40418a00efc..63a6da33fc3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -278,15 +278,15 @@ public class HeartbeatService {
               configManager.getPipeManager().getPipeRuntimeCoordinator());
       configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
       addConfigNodeLocationsToReq(dataNodeId, heartbeatReq);
-      if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) {
-        SyncDataNodeHeartbeatClientPool.getInstance()
-            .getDataNodeHeartBeat(
-                dataNodeInfo.getLocation().getInternalEndPoint(), 
heartbeatReq, handler);
-      } else {
-        AsyncDataNodeHeartbeatClientPool.getInstance()
-            .getDataNodeHeartBeat(
-                dataNodeInfo.getLocation().getInternalEndPoint(), 
heartbeatReq, handler);
-      }
+            if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) {
+              SyncDataNodeHeartbeatClientPool.getInstance()
+                  .getDataNodeHeartBeat(
+                      dataNodeInfo.getLocation().getInternalEndPoint(), 
heartbeatReq, handler);
+            } else {
+      AsyncDataNodeHeartbeatClientPool.getInstance()
+          .getDataNodeHeartBeat(
+              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, 
handler);
+            }
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
index e05f00415bd..3a4ff73ecae 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
 import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
 import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
@@ -182,12 +181,8 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
                 CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
                 nodeLocations,
                 dataNodeLocationMap);
-    if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) {
-      // TODO: Haonan do it syncly
-    } else {
-      CnToDnInternalServiceAsyncRequestManager.getInstance()
-          .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
-    }
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
     final List<TTestConnectionResult> results = new ArrayList<>();
     dataNodeAsyncRequestContext
         .getResponseMap()
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index cb635c8e6dd..4419ab2a72d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -41,6 +43,7 @@ public class AsyncIoTConsensusServiceClient extends 
IoTConsensusIService.AsyncCl
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncIoTConsensusServiceClient.class);
+  private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private final boolean printLogWhenEncounterException;
   private final TEndPoint endpoint;
@@ -55,8 +58,16 @@ public class AsyncIoTConsensusServiceClient extends 
IoTConsensusIService.AsyncCl
     super(
         property.getProtocolFactory(),
         tAsyncClientManager,
-        TNonblockingSocketWrapper.wrap(
-            endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
+        commonConfig.isEnableSSL()
+            ? TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(),
+                endpoint.getPort(),
+                commonConfig.getKeyStorePath(),
+                commonConfig.getKeyStorePwd(),
+                commonConfig.getTrustStorePath(),
+                commonConfig.getTrustStorePwd())
+            : TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
     setTimeout(property.getConnectionTimeoutMs());
     this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
     this.endpoint = endpoint;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
index 1b3d6a21a1f..0fae68ec249 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -41,6 +43,7 @@ public class AsyncConfigNodeInternalServiceClient extends 
IConfigNodeRPCService.
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncConfigNodeInternalServiceClient.class);
+  private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private long originalTimeout = -1;
 
@@ -57,8 +60,16 @@ public class AsyncConfigNodeInternalServiceClient extends 
IConfigNodeRPCService.
     super(
         property.getProtocolFactory(),
         tClientManager,
-        TNonblockingSocketWrapper.wrap(
-            endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
+        commonConfig.isEnableSSL()
+            ? TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(),
+                endpoint.getPort(),
+                commonConfig.getKeyStorePath(),
+                commonConfig.getKeyStorePwd(),
+                commonConfig.getTrustStorePath(),
+                commonConfig.getTrustStorePwd())
+            : TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
     setTimeout(property.getConnectionTimeoutMs());
     this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
     this.endpoint = endpoint;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
index 5de58b8eef1..dad3eb02dc4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -42,6 +44,7 @@ public class AsyncDataNodeExternalServiceClient extends 
IDataNodeRPCService.Asyn
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeExternalServiceClient.class);
+  private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private final boolean printLogWhenEncounterException;
 
@@ -57,8 +60,16 @@ public class AsyncDataNodeExternalServiceClient extends 
IDataNodeRPCService.Asyn
     super(
         property.getProtocolFactory(),
         tClientManager,
-        TNonblockingSocketWrapper.wrap(
-            endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
+        commonConfig.isEnableSSL()
+            ? TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(),
+                endpoint.getPort(),
+                commonConfig.getKeyStorePath(),
+                commonConfig.getKeyStorePwd(),
+                commonConfig.getTrustStorePath(),
+                commonConfig.getTrustStorePwd())
+            : TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
     setTimeout(property.getConnectionTimeoutMs());
     this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
     this.endpoint = endpoint;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index e56aab91c0d..b39fce3bb87 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -42,6 +44,7 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class);
+  private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   public long originalTimeout = -1;
 
@@ -59,8 +62,16 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
     super(
         property.getProtocolFactory(),
         tClientManager,
-        TNonblockingSocketWrapper.wrap(
-            endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
+        commonConfig.isEnableSSL()
+            ? TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(),
+                endpoint.getPort(),
+                commonConfig.getKeyStorePath(),
+                commonConfig.getKeyStorePwd(),
+                commonConfig.getTrustStorePath(),
+                commonConfig.getTrustStorePwd())
+            : TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
     setTimeout(property.getConnectionTimeoutMs());
     this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
     this.endpoint = endpoint;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 6434ec7f017..685105ef075 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -41,6 +43,7 @@ public class AsyncDataNodeMPPDataExchangeServiceClient 
extends MPPDataExchangeSe
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeMPPDataExchangeServiceClient.class);
+  private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private final boolean printLogWhenEncounterException;
   private final TEndPoint endpoint;
@@ -55,8 +58,16 @@ public class AsyncDataNodeMPPDataExchangeServiceClient 
extends MPPDataExchangeSe
     super(
         property.getProtocolFactory(),
         tClientManager,
-        TNonblockingSocketWrapper.wrap(
-            endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
+        commonConfig.isEnableSSL()
+            ? TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(),
+                endpoint.getPort(),
+                commonConfig.getKeyStorePath(),
+                commonConfig.getKeyStorePwd(),
+                commonConfig.getTrustStorePath(),
+                commonConfig.getTrustStorePwd())
+            : TNonblockingSocketWrapper.wrap(
+                endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs()));
     setTimeout(property.getConnectionTimeoutMs());
     this.printLogWhenEncounterException = 
property.isPrintLogWhenEncounterException();
     this.endpoint = endpoint;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 038d401cb2c..849eeca860b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -405,19 +405,19 @@ public class CommonConfig {
   private volatile Pattern trustedUriPattern = Pattern.compile("file:.*");
 
   /** Enable the thrift Service ssl. */
-  private boolean enableSSL = false;
+  private boolean enableSSL = true;
 
   /** ssl key Store Path. */
-  private String keyStorePath = "";
+  private String keyStorePath = "/Users/ht/.keystore";
 
   /** ssl key Store password. */
-  private String keyStorePwd = "";
+  private String keyStorePwd = "123456";
 
   /** ssl trust Store Path. */
-  private String trustStorePath = "";
+  private String trustStorePath = "/Users/ht/.truststore";
 
   /** ssl trust Store password. */
-  private String trustStorePwd = "";
+  private String trustStorePwd = "123456";
 
   CommonConfig() {
     // Empty constructor
@@ -2602,7 +2602,7 @@ public class CommonConfig {
   }
 
   public String getTrustStorePath() {
-    return trustStorePwd;
+    return trustStorePath;
   }
 
   public void setTrustStorePath(String trustStorePath) {


Reply via email to