This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch ssl_between_nodes in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e9a0d58348ad772120b0cd3f8dac339b360c8829 Author: HTHou <[email protected]> AuthorDate: Fri Jul 18 10:19:12 2025 +0800 add more client supports --- .../iot/client/SyncIoTConsensusServiceClient.java | 20 +++++++++--- .../iotdb/commons/client/ainode/AINodeClient.java | 36 +++++++++++++++++----- .../client/ainode/AsyncAINodeServiceClient.java | 17 ++++++++-- .../async/AsyncPipeConsensusServiceClient.java | 16 ++++++++-- 4 files changed, 73 insertions(+), 16 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java index da120fd56d7..9a6810d0ecd 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.ThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; import org.apache.iotdb.rpc.TConfigurationConst; @@ -36,6 +38,7 @@ import org.apache.thrift.transport.TTransportException; public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client implements ThriftClient, AutoCloseable { + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final boolean printLogWhenEncounterException; private final TEndPoint endpoint; @@ -50,12 +53,21 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client property .getProtocolFactory() .getProtocol( - DeepCopyRpcTransportFactory.INSTANCE.getTransport( - new TSocket( - TConfigurationConst.defaultTConfiguration, + commonConfig.isEnableSSL() + ? DeepCopyRpcTransportFactory.INSTANCE.getTransport( endpoint.getIp(), endpoint.getPort(), - property.getConnectionTimeoutMs())))); + property.getConnectionTimeoutMs(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd()) + : DeepCopyRpcTransportFactory.INSTANCE.getTransport( + new TSocket( + TConfigurationConst.defaultTConfiguration, + endpoint.getIp(), + endpoint.getPort(), + property.getConnectionTimeoutMs())))); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; this.clientManager = clientManager; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java index 99479256664..f896dd33218 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AINodeClient.java @@ -38,6 +38,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.ThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.ainode.LoadModelException; import org.apache.iotdb.commons.model.ModelInformation; import org.apache.iotdb.rpc.TConfigurationConst; @@ -46,6 +48,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.thrift.TException; +import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -67,6 +70,8 @@ public class AINodeClient implements AutoCloseable, ThriftClient { private static final Logger logger = LoggerFactory.getLogger(AINodeClient.class); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + private final TEndPoint endPoint; private TTransport transport; @@ -94,14 +99,29 @@ public class AINodeClient implements AutoCloseable, ThriftClient { private void init() throws TException { try { - transport = - new TFramedTransport.Factory() - .getTransport( - new TSocket( - TConfigurationConst.defaultTConfiguration, - endPoint.getIp(), - endPoint.getPort(), - property.getConnectionTimeoutMs())); + if (commonConfig.isEnableSSL()) { + TSSLTransportFactory.TSSLTransportParameters params = + new TSSLTransportFactory.TSSLTransportParameters(); + params.setTrustStore(commonConfig.getTrustStorePath(), commonConfig.getTrustStorePwd()); + params.setKeyStore(commonConfig.getKeyStorePath(), commonConfig.getKeyStorePwd()); + transport = + new TFramedTransport.Factory() + .getTransport( + TSSLTransportFactory.getClientSocket( + endPoint.getIp(), + endPoint.getPort(), + property.getConnectionTimeoutMs(), + params)); + } else { + transport = + new TFramedTransport.Factory() + .getTransport( + new TSocket( + TConfigurationConst.defaultTConfiguration, + endPoint.getIp(), + endPoint.getPort(), + property.getConnectionTimeoutMs())); + } if (!transport.isOpen()) { transport.open(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java index 3276923deb7..d5dda4f6353 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ainode/AsyncAINodeServiceClient.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; import org.apache.commons.pool2.PooledObject; @@ -36,6 +38,8 @@ import java.io.IOException; public class AsyncAINodeServiceClient extends IAINodeRPCService.AsyncClient implements ThriftClient { + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + private final boolean printLogWhenEncounterException; private final TEndPoint endPoint; private final ClientManager<TEndPoint, AsyncAINodeServiceClient> clientManager; @@ -49,8 +53,17 @@ public class AsyncAINodeServiceClient extends IAINodeRPCService.AsyncClient super( property.getProtocolFactory(), tClientManager, - TNonblockingSocketWrapper.wrap( - endPoint.getIp(), endPoint.getPort(), property.getConnectionTimeoutMs())); + commonConfig.isEnableSSL() + ? TNonblockingSocketWrapper.wrap( + endPoint.getIp(), + endPoint.getPort(), + property.getConnectionTimeoutMs(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd()) + : TNonblockingSocketWrapper.wrap( + endPoint.getIp(), endPoint.getPort(), property.getConnectionTimeoutMs())); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endPoint = endPoint; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java index 6f05350c089..c50a3523efe 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeConsensusServiceClient.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.consensus.pipe.thrift.PipeConsensusIService; import org.apache.iotdb.rpc.TNonblockingSocketWrapper; @@ -43,6 +45,7 @@ public class AsyncPipeConsensusServiceClient extends PipeConsensusIService.Async private static final Logger LOGGER = LoggerFactory.getLogger(AsyncPipeConsensusServiceClient.class); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private static final AtomicInteger idGenerator = new AtomicInteger(0); private final int id = idGenerator.incrementAndGet(); @@ -60,8 +63,17 @@ public class AsyncPipeConsensusServiceClient extends PipeConsensusIService.Async super( property.getProtocolFactory(), tAsyncClientManager, - TNonblockingSocketWrapper.wrap( - endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + commonConfig.isEnableSSL() + ? TNonblockingSocketWrapper.wrap( + endpoint.getIp(), + endpoint.getPort(), + property.getConnectionTimeoutMs(), + commonConfig.getKeyStorePath(), + commonConfig.getKeyStorePwd(), + commonConfig.getTrustStorePath(), + commonConfig.getTrustStorePwd()) + : TNonblockingSocketWrapper.wrap( + endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint;
