This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch ssl_between_nodes in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4d4284c19eb3a4f80eaf3251f30442edd2f19542 Author: HTHou <[email protected]> AuthorDate: Tue Jun 24 15:14:31 2025 +0800 deving --- iotdb-client/service-rpc/pom.xml | 8 + .../iotdb/rpc/NettyTNonBlockingTransport.java | 607 +++++++++++++++++++++ .../iotdb/rpc/TNonblockingSocketWrapper.java | 17 + .../iotdb/confignode/conf/ConfigNodeConfig.java | 4 +- .../iotdb/confignode/manager/load/LoadManager.java | 2 +- .../manager/load/service/HeartbeatService.java | 18 +- .../manager/load/service/TopologyService.java | 9 +- .../iot/client/AsyncIoTConsensusServiceClient.java | 15 +- .../AsyncConfigNodeInternalServiceClient.java | 15 +- .../async/AsyncDataNodeExternalServiceClient.java | 15 +- .../async/AsyncDataNodeInternalServiceClient.java | 15 +- .../AsyncDataNodeMPPDataExchangeServiceClient.java | 15 +- .../apache/iotdb/commons/conf/CommonConfig.java | 12 +- 13 files changed, 717 insertions(+), 35 deletions(-) diff --git a/iotdb-client/service-rpc/pom.xml b/iotdb-client/service-rpc/pom.xml index d7efd73817c..f4b3a7e2e90 100644 --- a/iotdb-client/service-rpc/pom.xml +++ b/iotdb-client/service-rpc/pom.xml @@ -84,6 +84,14 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + </dependency> </dependencies> <build> <plugins> diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java new file mode 100644 index 00000000000..63f4611d325 --- /dev/null +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.security.KeyStore; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +public class NettyTNonBlockingTransport extends TNonblockingTransport { + + private static final Logger logger = LoggerFactory.getLogger(NettyTNonBlockingTransport.class); + + private final String host; + private final int port; + private final int connectTimeoutMs = 60000; + private final EventLoopGroup eventLoopGroup; + private final Bootstrap bootstrap; + private Channel channel; + private final AtomicBoolean connected = new AtomicBoolean(false); + private final AtomicBoolean connecting = new AtomicBoolean(false); + private final BlockingQueue<ByteBuf> readQueue = new LinkedBlockingQueue<>(); + private final Object writeLock = new Object(); + private NettySelectionKeyAdapter selectionKeyAdapter; + + // SSL 配置 + private final String keystorePath; + private final String keystorePassword; + private final String truststorePath; + private final String truststorePassword; + private boolean sslEnabled = false; + + public NettyTNonBlockingTransport( + String host, + int port, + String keystorePath, + String keystorePassword, + String truststorePath, + String truststorePassword) + throws TTransportException { + super(new TConfiguration()); + this.host = host; + this.port = port; + this.eventLoopGroup = new NioEventLoopGroup(); + this.bootstrap = new Bootstrap(); + this.keystorePath = keystorePath; + this.keystorePassword = keystorePassword; + this.truststorePath = truststorePath; + this.truststorePassword = truststorePassword; + this.sslEnabled = true; + initBootstrap(); + } + + private void initBootstrap() { + bootstrap + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) + .handler( + new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + logger.info("Initializing channel for {}:{}", host, port); + + ChannelPipeline pipeline = ch.pipeline(); + + // 添加 SSL 处理器(如果启用) + if (sslEnabled) { + SslContext sslContext = createSslContext(); + SslHandler sslHandler = sslContext.newHandler(ch.alloc(), host, port); + // 增加握手超时时间 + sslHandler.setHandshakeTimeoutMillis(30000); + + pipeline.addLast("ssl", sslHandler); + // 添加SSL握手完成监听器 + sslHandler + .handshakeFuture() + .addListener( + future -> { + if (future.isSuccess()) { + logger.info("SSL handshake completed successfully"); + } else { + logger.info("SSL handshake failed: ", future.cause()); + } + }); + } + + // 添加业务处理器 + pipeline.addLast("handler", new NettyTransportHandler()); + } + }); + } + + private SslContext createSslContext() throws Exception { + SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); + + // 配置 KeyStore(客户端证书) + if (keystorePath != null && keystorePassword != null) { + KeyStore keyStore = KeyStore.getInstance("JKS"); + try (FileInputStream fis = new FileInputStream(keystorePath)) { + keyStore.load(fis, keystorePassword.toCharArray()); + } + KeyManagerFactory kmf = + KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(keyStore, keystorePassword.toCharArray()); + sslContextBuilder.keyManager(kmf); + } + + // 配置 TrustStore(信任的服务器证书) + if (truststorePath != null && truststorePassword != null) { + KeyStore trustStore = KeyStore.getInstance("JKS"); + try (FileInputStream fis = new FileInputStream(truststorePath)) { + trustStore.load(fis, truststorePassword.toCharArray()); + } + TrustManagerFactory tmf = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(trustStore); + sslContextBuilder.trustManager(tmf); + } + + return sslContextBuilder.build(); + } + + @Override + public boolean isOpen() { + return channel != null && channel.isActive() && connected.get(); + } + + @Override + public void open() throws TTransportException { + throw new RuntimeException("open() is not implemented for NettyTNonblockingTransport"); + } + + /** Perform a nonblocking read into buffer. */ + public int read(ByteBuffer buffer) throws TTransportException { + if (!isOpen()) { + logger.info("Transport not open for ByteBuffer read"); + throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); + } + + try { + ByteBuf byteBuf = readQueue.take(); +// if (byteBuf == null) { +// logger.info("No data available for ByteBuffer read (non-blocking)"); +// return 0; // 非阻塞读取,没有数据时返回 0 +// } + + int available = Math.min(buffer.remaining(), byteBuf.readableBytes()); + if (available > 0) { + // 从 ByteBuf 读取数据到 ByteBuffer + byte[] tempArray = new byte[available]; + byteBuf.readBytes(tempArray); + buffer.put(tempArray); + + logger.info( + "Read {} bytes into ByteBuffer, remaining space: {}", available, buffer.remaining()); + } + + // 如果还有剩余数据,创建一个新的 ByteBuf 包含剩余数据 + if (byteBuf.readableBytes() > 0) { + ByteBuf remaining = byteBuf.slice(); + remaining.retain(); + readQueue.offer(remaining); + logger.info("Put back {} remaining bytes", remaining.readableBytes()); + } + + byteBuf.release(); + return available; + + } catch (Exception e) { + logger.warn("ByteBuffer read failed: ", e); + throw new TTransportException(TTransportException.UNKNOWN, e); + } + } + + /** Reads from the underlying input stream if not null. */ + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + if (!isOpen()) { + logger.info( + "Transport not open for read - channel: " + + (channel != null ? channel.isActive() : "null") + + ", connected: " + + connected.get()); + throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); + } + + try { + // 使用 ByteBuffer 包装数组进行读取 + ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); + return read(buffer); + } catch (Exception e) { + logger.warn("Read failed: ", e); + throw new TTransportException(TTransportException.UNKNOWN, e); + } + } + + /** Perform a nonblocking write of the data in buffer; */ + public int write(ByteBuffer buffer) throws TTransportException { + if (!isOpen()) { + logger.info("Transport not open for ByteBuffer write"); + throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); + } + + int remaining = buffer.remaining(); + if (remaining == 0) { + return 0; + } + + logger.info("Writing " + remaining + " bytes from ByteBuffer"); + + synchronized (writeLock) { + // 创建 ByteBuf 从 ByteBuffer + ByteBuf byteBuf = Unpooled.buffer(); + byteBuf.writeBytes(buffer); + ChannelFuture future = channel.writeAndFlush(byteBuf); + + final int bytesToWrite = remaining; + future.addListener( + (GenericFutureListener<ChannelFuture>) + future1 -> { + if (future1.isSuccess()) { + logger.info( + "ByteBuffer write completed successfully: " + bytesToWrite + " bytes"); + } else { + logger.warn("ByteBuffer write failed: " + future1.cause().getMessage()); + future1.cause().printStackTrace(); + } + }); + } + + // 对于非阻塞写入,我们假设所有数据都能写入 + // 实际的写入状态通过 Future 监听器处理 + return remaining; + } + + /** Writes to the underlying output stream if not null. */ + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + if (!isOpen()) { + logger.info("Transport not open for write"); + throw new TTransportException(TTransportException.NOT_OPEN, "Transport not open"); + } + + // 使用 ByteBuffer 包装数组进行写入 + ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); + write(buffer); + } + + @Override + public void flush() throws TTransportException { + // Not supported by SocketChannel. + } + + @Override + public void close() { + connected.set(false); + if (channel != null) { + channel.close(); + } + eventLoopGroup.shutdownGracefully(); + } + + @Override + public boolean startConnect() throws IOException { + if (connected.get() || connecting.get()) { + logger.info("Starting connection return " + (connected.get() || connecting.get())); + return connected.get(); + } + + if (!connecting.compareAndSet(false, true)) { + return false; + } + logger.info("Starting connection to " + host + ":" + port); + + try { + ChannelFuture future = bootstrap.connect(host, port); + future.addListener( + (GenericFutureListener<ChannelFuture>) + future1 -> { + if (future1.isSuccess()) { + logger.info("Connection established successfully"); + channel = future1.channel(); + connected.set(true); + if (selectionKeyAdapter != null) { + selectionKeyAdapter.setConnected(true); + } + } + connecting.set(false); + }); + future.get(); + return false; // 异步连接,立即返回 false + } catch (Exception e) { + connecting.set(false); +// throw new IOException("Failed to start connection", e); + return false; + } + } + + @Override + public boolean finishConnect() throws IOException { + return connected.get(); + } + + @Override + public SelectionKey registerSelector(Selector selector, int interests) throws IOException { + if (selectionKeyAdapter == null) { + selectionKeyAdapter = new NettySelectionKeyAdapter(this, selector, interests); + + // 尝试通过反射获取 selectedKeys 的可修改引用 + try { + // 尝试不同的字段名,因为不同的 Selector 实现可能使用不同的字段名 + String[] possibleFieldNames = {"selectedKeys", "publicSelectedKeys", "keys"}; + Field selectedKeysField = null; + + for (String fieldName : possibleFieldNames) { + try { + selectedKeysField = selector.getClass().getSuperclass().getDeclaredField(fieldName); + break; + } catch (NoSuchFieldException e) { + // 继续尝试下一个字段名 + } + } + + if (selectedKeysField != null) { + selectedKeysField.setAccessible(true); + Object selectedKeysObj = selectedKeysField.get(selector); + + if (selectedKeysObj instanceof Set) { + @SuppressWarnings("unchecked") + Set<SelectionKey> selectedKeys = (Set<SelectionKey>) selectedKeysObj; + selectionKeyAdapter.setSelectedKeysReference(selectedKeys); + logger.info("Successfully obtained selectedKeys reference via reflection"); + try { + selectionKeyAdapter.selectedKeysReference.add(selectionKeyAdapter); + selector.wakeup(); + } catch (Exception e) { + logger.warn("Failed to add to selectedKeys: " + e.getMessage()); + } + } + } + } catch (Exception e) { + logger.warn("Failed to access selectedKeys via reflection: " + e.getMessage()); + // 继续执行,使用备用方案 + } + + } else { + selectionKeyAdapter.interestOps(interests); + } + + return selectionKeyAdapter; + } + + @Override + public String toString() { + return "[remote: " + getRemoteAddress() + ", local: " + getLocalAddress() + "]"; + } + + // Netty 处理器 + private class NettyTransportHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + logger.info("Channel active: " + ctx.channel().remoteAddress()); + + connected.set(true); + + // 更新 SelectionKey 状态 + if (selectionKeyAdapter != null) { + selectionKeyAdapter.setConnected(true); + selectionKeyAdapter.setReadReady(true); + } + + super.channelActive(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf byteBuf = (ByteBuf) msg; + logger.info("Received " + byteBuf.readableBytes() + " bytes"); + + // 保留引用计数,将数据放入读取队列 + readQueue.offer(byteBuf.retain()); + byteBuf.release(); // 释放原始引用 + + // 通知选择器适配器有数据可读 + if (selectionKeyAdapter != null) { + selectionKeyAdapter.setReadReady(true); + } + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + logger.info("Channel inactive"); + + connected.set(false); + connecting.set(false); + + // 更新 SelectionKey 状态 + if (selectionKeyAdapter != null) { + selectionKeyAdapter.setConnected(false); + selectionKeyAdapter.setReadReady(false); + } + + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.warn("Channel exception: {}", cause.getMessage()); + + // 更新 SelectionKey 状态 + if (selectionKeyAdapter != null) { + selectionKeyAdapter.setConnected(false); + selectionKeyAdapter.setReadReady(false); + selectionKeyAdapter.cancel(); // 取消 SelectionKey + } + + ctx.close(); + } + } + + // SelectionKey 适配器类 + private static class NettySelectionKeyAdapter extends SelectionKey { + private final NettyTNonBlockingTransport transport; + private Selector selector; + private int interestOps; + private int readyOps = 0; + private volatile boolean connected = false; + private volatile boolean readReady = false; + private volatile boolean valid = true; + private Set<SelectionKey> selectedKeysReference; + + public NettySelectionKeyAdapter( + NettyTNonBlockingTransport transport, Selector selector, int ops) { + this.transport = transport; + this.selector = selector; + this.interestOps = ops; + } + + public void setSelectedKeysReference(Set<SelectionKey> selectedKeys) { + this.selectedKeysReference = selectedKeys; + } + + @Override + public SelectableChannel channel() { + return null; // Netty 管理通道 + } + + @Override + public Selector selector() { + return selector; + } + + @Override + public void cancel() { + this.valid = false; + if (selectedKeysReference != null) { + selectedKeysReference.remove(this); + } + logger.info("SelectionKey cancelled"); + } + + @Override + public boolean isValid() { + return valid; + } + + @Override + public SelectionKey interestOps(int ops) { + this.interestOps = ops; + logger.info("SelectionKey interestOps set to: " + ops); + updateSelectorIfReady(); + return this; + } + + @Override + public int interestOps() { + return interestOps; + } + + @Override + public int readyOps() { + int ops = 0; + + // 检查连接状态 + if (connected && (interestOps & OP_CONNECT) != 0) { + ops |= OP_CONNECT; + } + + // 检查读取状态 + if (readReady && (interestOps & OP_READ) != 0) { + ops |= OP_READ; + } + + // 写入通常总是就绪的(如果连接是活跃的) + if ((interestOps & OP_WRITE) != 0 && transport.isOpen()) { + ops |= OP_WRITE; + } + + if (ops != readyOps) { + logger.info("SelectionKey readyOps changed: " + readyOps + " -> " + ops); + } + + readyOps = ops; + return readyOps; + } + + public void setConnected(boolean connected) { + this.connected = connected; + updateSelectorIfReady(); + } + + public void setReadReady(boolean readReady) { + if (this.readReady != readReady) { + logger.info("ReadReady changed: {} -> {}", this.readReady, readReady); + } + this.readReady = readReady; + updateSelectorIfReady(); + } + + private void updateSelectorIfReady() { + if (selector != null && isValid() && selectedKeysReference != null) { + int ready = readyOps(); + if (ready != 0) { + try { + selectedKeysReference.add(this); + selector.wakeup(); + logger.info("Added self to selectedKeys via reference, readyOps: " + ready); + } catch (Exception e) { + logger.warn("Failed to add to selectedKeys: " + e.getMessage()); + } + } + } + } + } + + // 辅助方法:获取远程地址 + public SocketAddress getRemoteAddress() { + return channel != null ? channel.remoteAddress() : null; + } + + // 辅助方法:获取本地地址 + public SocketAddress getLocalAddress() { + return channel != null ? channel.localAddress() : null; + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java index 5f2b6cde2f4..9370d4a4373 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java @@ -20,6 +20,7 @@ package org.apache.iotdb.rpc; import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransportException; import java.io.IOException; @@ -58,5 +59,21 @@ public class TNonblockingSocketWrapper { } } + public static TNonblockingTransport wrap( + String host, + int port, + String keyStorePath, + String keyStorePwd, + String trustStorePath, + String trustStorePwd) { + try { + return new NettyTNonBlockingTransport( + host, port, keyStorePath, keyStorePwd, trustStorePath, trustStorePwd); + } catch (TTransportException e) { + // never happen + return null; + } + } + private TNonblockingSocketWrapper() {} } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 50c8f5bc9a0..a8d5c330c5b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -61,10 +61,10 @@ public class ConfigNodeConfig { private int configRegionId = 0; /** ConfigNodeGroup consensus protocol. */ - private String configNodeConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; + private String configNodeConsensusProtocolClass = ConsensusFactory.SIMPLE_CONSENSUS; /** Schema region consensus protocol. */ - private String schemaRegionConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; + private String schemaRegionConsensusProtocolClass = ConsensusFactory.SIMPLE_CONSENSUS; /** Default number of SchemaRegion replicas. */ private int schemaReplicationFactor = 1; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 54dd582551d..c1651039ec5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -152,7 +152,7 @@ public class LoadManager { statisticsService.startLoadStatisticsService(); eventService.startEventService(); partitionBalancer.setupPartitionBalancer(); - topologyService.startTopologyService(); +// topologyService.startTopologyService(); } public void stopLoadServices() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 40418a00efc..63a6da33fc3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -278,15 +278,15 @@ public class HeartbeatService { configManager.getPipeManager().getPipeRuntimeCoordinator()); configManager.getClusterQuotaManager().updateSpaceQuotaUsage(); addConfigNodeLocationsToReq(dataNodeId, heartbeatReq); - if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) { - SyncDataNodeHeartbeatClientPool.getInstance() - .getDataNodeHeartBeat( - dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); - } else { - AsyncDataNodeHeartbeatClientPool.getInstance() - .getDataNodeHeartBeat( - dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); - } + if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) { + SyncDataNodeHeartbeatClientPool.getInstance() + .getDataNodeHeartBeat( + dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); + } else { + AsyncDataNodeHeartbeatClientPool.getInstance() + .getDataNodeHeartBeat( + dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler); + } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index e05f00415bd..3a4ff73ecae 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; @@ -182,12 +181,8 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK, nodeLocations, dataNodeLocationMap); - if (CommonDescriptor.getInstance().getConfig().isEnableSSL()) { - // TODO: Haonan do it syncly - } else { - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, PROBING_TIMEOUT_MS); - } + CnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, PROBING_TIMEOUT_MS); final List<TTestConnectionResult> results = new ArrayList<>(); dataNodeAsyncRequestContext .getResponseMap() diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java index cb635c8e6dd..4419ab2a72d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; @@ -41,6 +43,7 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl private static final Logger logger = LoggerFactory.getLogger(AsyncIoTConsensusServiceClient.class); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final boolean printLogWhenEncounterException; private final TEndPoint endpoint; @@ -55,8 +58,16 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl super( property.getProtocolFactory(), tAsyncClientManager, - TNonblockingSocketWrapper.wrap( - endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + commonConfig.isEnableSSL() + ? TNonblockingSocketWrapper.wrap( + endpoint.getIp(), + endpoint.getPort(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd()) + : TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java index 1b3d6a21a1f..0fae68ec249 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; @@ -41,6 +43,7 @@ public class AsyncConfigNodeInternalServiceClient extends IConfigNodeRPCService. private static final Logger logger = LoggerFactory.getLogger(AsyncConfigNodeInternalServiceClient.class); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private long originalTimeout = -1; @@ -57,8 +60,16 @@ public class AsyncConfigNodeInternalServiceClient extends IConfigNodeRPCService. super( property.getProtocolFactory(), tClientManager, - TNonblockingSocketWrapper.wrap( - endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + commonConfig.isEnableSSL() + ? TNonblockingSocketWrapper.wrap( + endpoint.getIp(), + endpoint.getPort(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd()) + : TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java index 5de58b8eef1..dad3eb02dc4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; @@ -42,6 +44,7 @@ public class AsyncDataNodeExternalServiceClient extends IDataNodeRPCService.Asyn private static final Logger logger = LoggerFactory.getLogger(AsyncDataNodeExternalServiceClient.class); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final boolean printLogWhenEncounterException; @@ -57,8 +60,16 @@ public class AsyncDataNodeExternalServiceClient extends IDataNodeRPCService.Asyn super( property.getProtocolFactory(), tClientManager, - TNonblockingSocketWrapper.wrap( - endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + commonConfig.isEnableSSL() + ? TNonblockingSocketWrapper.wrap( + endpoint.getIp(), + endpoint.getPort(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd()) + : TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java index e56aab91c0d..b39fce3bb87 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; @@ -42,6 +44,7 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn private static final Logger logger = LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); public long originalTimeout = -1; @@ -59,8 +62,16 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn super( property.getProtocolFactory(), tClientManager, - TNonblockingSocketWrapper.wrap( - endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + commonConfig.isEnableSSL() + ? TNonblockingSocketWrapper.wrap( + endpoint.getIp(), + endpoint.getPort(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd()) + : TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java index 6434ec7f017..685105ef075 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; @@ -41,6 +43,7 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe private static final Logger logger = LoggerFactory.getLogger(AsyncDataNodeMPPDataExchangeServiceClient.class); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final boolean printLogWhenEncounterException; private final TEndPoint endpoint; @@ -55,8 +58,16 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe super( property.getProtocolFactory(), tClientManager, - TNonblockingSocketWrapper.wrap( - endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + commonConfig.isEnableSSL() + ? TNonblockingSocketWrapper.wrap( + endpoint.getIp(), + endpoint.getPort(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd()) + : TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 038d401cb2c..849eeca860b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -405,19 +405,19 @@ public class CommonConfig { private volatile Pattern trustedUriPattern = Pattern.compile("file:.*"); /** Enable the thrift Service ssl. */ - private boolean enableSSL = false; + private boolean enableSSL = true; /** ssl key Store Path. */ - private String keyStorePath = ""; + private String keyStorePath = "/Users/ht/.keystore"; /** ssl key Store password. */ - private String keyStorePwd = ""; + private String keyStorePwd = "123456"; /** ssl trust Store Path. */ - private String trustStorePath = ""; + private String trustStorePath = "/Users/ht/.truststore"; /** ssl trust Store password. */ - private String trustStorePwd = ""; + private String trustStorePwd = "123456"; CommonConfig() { // Empty constructor @@ -2602,7 +2602,7 @@ public class CommonConfig { } public String getTrustStorePath() { - return trustStorePwd; + return trustStorePath; } public void setTrustStorePath(String trustStorePath) {
