This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira5260 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b6cba8a8dce3a39af80a12f7dab2cd57628570e9 Author: Potato <[email protected]> AuthorDate: Mon Dec 26 19:57:19 2022 +0800 [IOTDB-5260] Refactor ClientManager API and Exception (#8561) Refactor ClientManager API and Exception * fix ci * update javadoc format * catch throwable for sync client * fix code smell * fix review Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../async/AsyncConfigNodeHeartbeatClientPool.java | 6 +- .../client/async/AsyncDataNodeClientPool.java | 5 +- .../async/AsyncDataNodeHeartbeatClientPool.java | 6 +- .../client/sync/SyncConfigNodeClientPool.java | 6 +- .../client/sync/SyncDataNodeClientPool.java | 8 +-- .../consensus/iot/IoTConsensusServerImpl.java | 17 ++--- .../consensus/iot/logdispatcher/LogDispatcher.java | 3 +- .../iotdb/consensus/ratis/RatisConsensus.java | 21 +++--- integration-test/import-control.xml | 1 + .../java/org/apache/iotdb/it/env/AbstractEnv.java | 28 ++++---- .../org/apache/iotdb/it/env/RemoteServerEnv.java | 34 +++++----- .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 3 +- .../confignode/it/IoTDBConfigNodeSnapshotIT.java | 3 +- .../iotdb/confignode/it/IoTDBStorageGroupIT.java | 8 +-- .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 7 +- .../it/cluster/IoTDBClusterNodeGetterIT.java | 8 +-- .../load/IoTDBClusterRegionLeaderBalancingIT.java | 6 +- .../it/load/IoTDBConfigNodeSwitchLeaderIT.java | 5 +- .../partition/IoTDBAutoRegionGroupExtensionIT.java | 4 +- .../IoTDBCustomRegionGroupExtensionIT.java | 6 +- .../it/partition/IoTDBPartitionDurableIT.java | 12 ++-- .../it/partition/IoTDBPartitionGetterIT.java | 17 ++--- .../partition/IoTDBPartitionInheritPolicyIT.java | 5 +- .../commons/client/ClientFactoryProperty.java | 7 +- .../apache/iotdb/commons/client/ClientManager.java | 78 +++++++++------------- .../iotdb/commons/client/ClientPoolProperty.java | 18 +++-- .../iotdb/commons/client/IClientManager.java | 18 ++--- .../iotdb/commons/client/IClientPoolFactory.java | 7 +- .../ClientManagerException.java} | 12 ++-- .../iotdb/commons/service/ThriftServiceThread.java | 2 +- .../iotdb/commons/client/ClientManagerTest.java | 22 +++--- .../iotdb/db/auth/ClusterAuthorityFetcher.java | 10 +-- .../metadata/template/ClusterTemplateManager.java | 6 +- .../db/mpp/execution/exchange/SinkHandle.java | 4 +- .../apache/iotdb/db/mpp/plan/TestRPCClient.java | 13 ++-- .../mpp/plan/analyze/ClusterPartitionFetcher.java | 17 +++-- .../db/mpp/plan/analyze/cache/PartitionCache.java | 11 +-- .../config/executor/ClusterConfigTaskExecutor.java | 57 ++++++++-------- .../scheduler/AbstractFragInsStateTracker.java | 4 +- .../scheduler/FixedRateFragInsStateTracker.java | 4 +- .../scheduler/FragmentInstanceDispatcherImpl.java | 4 +- .../mpp/plan/scheduler/SimpleQueryTerminator.java | 7 +- .../scheduler/load/LoadTsFileDispatcherImpl.java | 6 +- .../db/trigger/executor/TriggerFireVisitor.java | 10 ++- .../db/mpp/execution/exchange/SinkHandleTest.java | 9 +-- .../mpp/execution/exchange/SourceHandleTest.java | 12 ++-- 46 files changed, 265 insertions(+), 292 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java index 8dd67da315..3c2e3072f3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java @@ -42,12 +42,8 @@ public class AsyncConfigNodeHeartbeatClientPool { */ public void getConfigNodeHeartBeat( TEndPoint endPoint, long timestamp, ConfigNodeHeartbeatHandler handler) { - AsyncConfigNodeHeartbeatServiceClient client; try { - client = clientManager.purelyBorrowClient(endPoint); - if (client != null) { - client.getConfigNodeHeartBeat(timestamp, handler); - } + clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(timestamp, handler); } catch (Exception ignore) { // Just ignore } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java index 3973689b3b..7f0404cc69 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java @@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler; @@ -60,8 +61,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */ public class AsyncDataNodeClientPool { @@ -350,7 +349,7 @@ public class AsyncDataNodeClientPool { } public AsyncDataNodeInternalServiceClient getAsyncClient(TDataNodeLocation targetDataNode) - throws IOException { + throws ClientManagerException { return clientManager.borrowClient(targetDataNode.getInternalEndPoint()); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java index a42616c722..1a92dd6ac9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java @@ -44,12 +44,8 @@ public class AsyncDataNodeHeartbeatClientPool { */ public void getDataNodeHeartBeat( TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) { - AsyncDataNodeHeartbeatServiceClient client; try { - client = clientManager.purelyBorrowClient(endPoint); - if (client != null) { - client.getDataNodeHeartBeat(req, handler); - } + clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler); } catch (Exception ignore) { // Just ignore } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java index cb8e62b595..0d0f5336f3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.client.ConfigNodeRequestType; import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq; @@ -35,7 +36,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.concurrent.TimeUnit; /** Synchronously send RPC requests to ConfigNode. See confignode.thrift for more details. */ @@ -93,7 +93,7 @@ public class SyncConfigNodeClientPool { return RpcUtils.getStatus( TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType); } - } catch (Throwable e) { + } catch (Exception e) { lastException = e; LOGGER.warn( "{} failed on ConfigNode {}, because {}, retrying {}...", @@ -118,7 +118,7 @@ public class SyncConfigNodeClientPool { */ public TSStatus removeConfigNode( TConfigNodeLocation configNodeLocation, SyncConfigNodeIServiceClient client) - throws TException, IOException, InterruptedException { + throws ClientManagerException, TException, InterruptedException { TSStatus status = client.removeConfigNode(configNodeLocation); while (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { TimeUnit.MILLISECONDS.sleep(2000); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index 3de327ecbb..3136e05da1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; @@ -42,7 +43,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.concurrent.TimeUnit; /** Synchronously send RPC requests to DataNodes. See mpp.thrift for more details. */ @@ -67,7 +67,7 @@ public class SyncDataNodeClientPool { for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) { try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) { return executeSyncRequest(requestType, client, req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { lastException = e; if (retry != DEFAULT_RETRY_NUM - 1) { LOGGER.warn("{} failed on DataNode {}, retrying {}...", requestType, endPoint, retry + 1); @@ -86,7 +86,7 @@ public class SyncDataNodeClientPool { for (int retry = 0; retry < retryNum; retry++) { try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) { return executeSyncRequest(requestType, client, req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { lastException = e; if (retry != retryNum - 1) { LOGGER.warn("{} failed on DataNode {}, retrying {}...", requestType, endPoint, retry + 1); @@ -167,7 +167,7 @@ public class SyncDataNodeClientPool { try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(dataNode)) { TRegionLeaderChangeReq req = new TRegionLeaderChangeReq(regionId, newLeaderNode); status = client.changeRegionLeader(req); - } catch (IOException e) { + } catch (ClientManagerException e) { LOGGER.error("Can't connect to Data node: {}", dataNode, e); status = new TSStatus(TSStatusCode.CAN_NOT_CONNECT_DATANODE.getStatusCode()); status.setMessage(e.getMessage()); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index d66d29cc6e..1d668c281a 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.iot; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -332,7 +333,7 @@ public class IoTConsensusServerImpl { reader.close(); } } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when send snapshot file to %s", targetPeer), e); } @@ -403,7 +404,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("error when inactivating %s. %s", peer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when inactivating %s", peer), e); } @@ -420,7 +421,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("error when triggering snapshot load %s. %s", peer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when activating %s", peer), e); } @@ -435,7 +436,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("error when activating %s. %s", peer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when activating %s", peer), e); } @@ -470,7 +471,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("build sync log channel failed from %s to %s", peer, targetPeer)); } - } catch (IOException | TException e) { + } catch (Exception e) { // We use a simple way to deal with the connection issue when notifying other nodes to // build sync log. If the un-responsible peer is the peer which will be removed, we cannot // suspend the operation and need to skip it. In order to keep the mechanism works fine, @@ -513,7 +514,7 @@ public class IoTConsensusServerImpl { throw new ConsensusGroupModifyPeerException( String.format("remove sync log channel failed from %s to %s", peer, targetPeer)); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("error when removing sync log channel to %s", peer), e); } @@ -545,7 +546,7 @@ public class IoTConsensusServerImpl { res.safeIndex); Thread.sleep(checkIntervalInMs); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { throw new ConsensusGroupModifyPeerException( String.format( "error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()), @@ -750,7 +751,7 @@ public class IoTConsensusServerImpl { String.format( "cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus())); } - } catch (IOException | TException e) { + } catch (Exception e) { throw new ConsensusGroupModifyPeerException( String.format("cleanup remote snapshot failed of %s", targetPeer), e); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index fecccc1403..752a392ed1 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -37,7 +37,6 @@ import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader; import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan; import org.apache.iotdb.metrics.utils.MetricLevel; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -450,7 +449,7 @@ public class LogDispatcher { batch.getEndIndex(), peer.getGroupId().convertToTConsensusGroupId()); client.syncLogEntries(req, handler); - } catch (IOException | TException e) { + } catch (Exception e) { logger.error("Can not sync logs to peer {} because", peer, e); handler.onError(e); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 151bf17bb4..e00374c347 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ClientPoolProperty; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.IClientPoolFactory; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -73,7 +74,6 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.function.CheckedSupplier; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,7 +243,7 @@ class RatisConsensus implements IConsensus { if (isLeader(consensusGroupId) && CommonDescriptor.getInstance().getConfig().isReadOnly()) { try { forceStepDownLeader(raftGroup); - } catch (IOException e) { + } catch (Exception e) { logger.warn("leader {} read only, force step down failed due to {}", myself, e); } return failedWrite(new NodeReadOnlyException(myself)); @@ -284,7 +284,7 @@ class RatisConsensus implements IConsensus { return failedWrite(new RatisRequestFailedException(reply.getException())); } writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer()); - } catch (IOException | TException e) { + } catch (Exception e) { return failedWrite(new RatisRequestFailedException(e)); } finally { if (client != null) { @@ -357,7 +357,7 @@ class RatisConsensus implements IConsensus { if (!reply.isSuccess()) { return failed(new RatisRequestFailedException(reply.getException())); } - } catch (IOException e) { + } catch (Exception e) { return failed(new RatisRequestFailedException(e)); } finally { if (client != null) { @@ -550,7 +550,7 @@ class RatisConsensus implements IConsensus { if (!reply.isSuccess()) { return failed(new RatisRequestFailedException(reply.getException())); } - } catch (IOException e) { + } catch (Exception e) { return failed(new RatisRequestFailedException(e)); } finally { if (client != null) { @@ -560,13 +560,14 @@ class RatisConsensus implements IConsensus { return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } - private void forceStepDownLeader(RaftGroup group) throws IOException { + private void forceStepDownLeader(RaftGroup group) throws ClientManagerException, IOException { // when newLeaderPeerId == null, ratis forces current leader to step down and raise new // election transferLeader(group, null); } - private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws IOException { + private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) + throws ClientManagerException, IOException { RatisClient client = null; try { client = getRaftClient(group); @@ -770,10 +771,10 @@ class RatisConsensus implements IConsensus { Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY)); } - private RatisClient getRaftClient(RaftGroup group) throws IOException { + private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException { try { return clientManager.borrowClient(group); - } catch (IOException e) { + } catch (ClientManagerException e) { logger.error(String.format("Borrow client from pool for group %s failed.", group), e); // rethrow the exception throw e; @@ -792,7 +793,7 @@ class RatisConsensus implements IConsensus { if (!reply.isSuccess()) { throw new RatisRequestFailedException(reply.getException()); } - } catch (IOException e) { + } catch (Exception e) { throw new RatisRequestFailedException(e); } finally { if (client != null) { diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml index cd981ac13e..8c65f3202a 100644 --- a/integration-test/import-control.xml +++ b/integration-test/import-control.xml @@ -70,6 +70,7 @@ <allow class="org.apache.iotdb.commons.cq.CQState" /> <allow class="org.apache.iotdb.consensus.ConsensusFactory" /> <allow class="org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils" /> + <allow class="org.apache.iotdb.commons.client.exception.ClientManagerException" /> <allow pkg="org\.apache\.iotdb\.common\.rpc\.thrift.*" regex="true" /> <allow pkg="org\.apache\.iotdb\.confignode\.rpc\.thrift.*" regex="true" /> <allow pkg="org\.apache\.iotdb\.commons\.client\.sync.*" regex="true" /> diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java index b80a32802d..bd5bd2f247 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java @@ -20,6 +20,7 @@ package org.apache.iotdb.it.env; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; @@ -72,10 +73,17 @@ public abstract class AbstractEnv implements BaseEnv { protected List<DataNodeWrapper> dataNodeWrapperList = Collections.emptyList(); protected String testMethodName = null; + private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager; + protected void initEnvironment(int configNodesNum, int dataNodesNum) { this.configNodeWrapperList = new ArrayList<>(); this.dataNodeWrapperList = new ArrayList<>(); + clientManager = + new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() + .createClientManager( + new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); + final String testClassName = getTestClassName(); final String testMethodName = getTestMethodName(); @@ -164,6 +172,7 @@ public abstract class AbstractEnv implements BaseEnv { logger.error("Delete lock file {} failed", lockPath); } } + clientManager.close(); testMethodName = null; } @@ -432,15 +441,12 @@ public abstract class AbstractEnv implements BaseEnv { @Override public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException, InterruptedException { - IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = - new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() - .createClientManager( - new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); + for (int i = 0; i < 30; i++) { for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { SyncConfigNodeIServiceClient client = - clientManager.purelyBorrowClient( + clientManager.borrowClient( new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort())); TShowClusterResp resp = client.showCluster(); @@ -457,8 +463,10 @@ public abstract class AbstractEnv implements BaseEnv { } } catch (Exception e) { logger.error( - "Borrow ConfigNodeClient from ConfigNode: {} failed, retrying...", - configNodeWrapper.getIpAndPortString()); + String.format( + "Borrow ConfigNodeClient from ConfigNode: %s failed, retrying...", + configNodeWrapper.getIpAndPortString()), + e); } // Sleep 1s before next retry @@ -470,10 +478,6 @@ public abstract class AbstractEnv implements BaseEnv { @Override public int getLeaderConfigNodeIndex() throws IOException, InterruptedException { - IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = - new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() - .createClientManager( - new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); for (int retry = 0; retry < 30; retry++) { for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) { ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId); @@ -486,7 +490,7 @@ public abstract class AbstractEnv implements BaseEnv { if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return configNodeId; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error( "Borrow ConfigNodeClient from ConfigNode: {} failed because: {}, retrying...", configNodeWrapper.getIp(), diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java index c4c0d77d20..2cc2d1f926 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java @@ -20,6 +20,7 @@ package org.apache.iotdb.it.env; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.db.client.DataNodeClientPoolFactory; @@ -41,10 +42,12 @@ import static org.apache.iotdb.jdbc.Config.VERSION; import static org.junit.Assert.fail; public class RemoteServerEnv implements BaseEnv { - private String ip_addr = System.getProperty("RemoteIp", "127.0.0.1"); - private String port = System.getProperty("RemotePort", "6667"); - private String user = System.getProperty("RemoteUser", "root"); - private String password = System.getProperty("RemotePassword", "root"); + + private final String ip_addr = System.getProperty("RemoteIp", "127.0.0.1"); + private final String port = System.getProperty("RemotePort", "6667"); + private final String user = System.getProperty("RemoteUser", "root"); + private final String password = System.getProperty("RemotePassword", "root"); + private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager; @Override public void initBeforeClass() { @@ -56,6 +59,10 @@ public class RemoteServerEnv implements BaseEnv { e.printStackTrace(); fail(e.getMessage()); } + clientManager = + new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() + .createClientManager( + new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); } @Override @@ -64,7 +71,9 @@ public class RemoteServerEnv implements BaseEnv { } @Override - public void cleanAfterClass() {} + public void cleanAfterClass() { + clientManager.close(); + } @Override public void initBeforeTest() { @@ -79,7 +88,9 @@ public class RemoteServerEnv implements BaseEnv { } @Override - public void cleanAfterTest() {} + public void cleanAfterTest() { + clientManager.close(); + } @Override public Connection getConnection(String username, String password) throws SQLException { @@ -151,15 +162,8 @@ public class RemoteServerEnv implements BaseEnv { } @Override - public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException { - IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager = - new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>() - .createClientManager( - new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory()); - try (SyncConfigNodeIServiceClient client = - clientManager.borrowClient(new TEndPoint(ip_addr, 22277))) { - return client; - } + public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws ClientManagerException { + return clientManager.borrowClient(new TEndPoint(ip_addr, 22277)); } @Override diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 346a7cd581..fbca0d8762 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.itbase.env; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.isession.ISession; import org.apache.iotdb.isession.SessionConfig; @@ -73,7 +74,7 @@ public interface BaseEnv { void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList); IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() - throws IOException, InterruptedException; + throws ClientManagerException, IOException, InterruptedException; default ISession getSessionConnection() throws IoTDBConnectionException { return getSessionConnection( diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java index d8d4f5d071..ca6493423f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java @@ -117,8 +117,7 @@ public class IoTDBConfigNodeSnapshotIT { } @Test - public void testPartitionInfoSnapshot() - throws IOException, IllegalPathException, TException, InterruptedException { + public void testPartitionInfoSnapshot() throws Exception { final String sg = "root.sg"; final int storageGroupNum = 10; final int seriesPartitionSlotsNum = 10; diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java index 0010a56420..b305c07ae7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java @@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.it; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp; import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq; @@ -36,7 +35,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -44,7 +42,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,8 +62,7 @@ public class IoTDBStorageGroupIT { } @Test - public void testSetAndQueryStorageGroup() - throws IOException, TException, IllegalPathException, InterruptedException { + public void testSetAndQueryStorageGroup() throws Exception { TSStatus status; final String sg0 = "root.sg0"; final String sg1 = "root.sg1"; @@ -156,7 +152,7 @@ public class IoTDBStorageGroupIT { } @Test - public void testDeleteStorageGroup() throws TException, IOException, InterruptedException { + public void testDeleteStorageGroup() throws Exception { TSStatus status; final String sg0 = "root.sg0"; final String sg1 = "root.sg1"; diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java index 2c924c9589..65b8bedad1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java @@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.it.cluster; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; @@ -86,7 +87,8 @@ public class IoTDBClusterNodeErrorStartUpIT { } @Test - public void testConflictNodeRegistration() throws IOException, InterruptedException, TException { + public void testConflictNodeRegistration() + throws ClientManagerException, InterruptedException, TException, IOException { /* Test ConfigNode conflict register */ // Construct a ConfigNodeWrapper that conflicts in consensus port with an existed one. @@ -142,7 +144,8 @@ public class IoTDBClusterNodeErrorStartUpIT { } @Test - public void testIllegalNodeRestart() throws IOException, InterruptedException, TException { + public void testIllegalNodeRestart() + throws ClientManagerException, IOException, InterruptedException, TException { ConfigNodeWrapper registeredConfigNodeWrapper = EnvFactory.getEnv().getConfigNodeWrapper(1); DataNodeWrapper registeredDataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java index dcaf66018b..79bbd29c38 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java @@ -42,7 +42,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,7 +49,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -100,7 +98,7 @@ public class IoTDBClusterNodeGetterIT { } @Test - public void showClusterAndNodesTest() throws IOException, InterruptedException, TException { + public void showClusterAndNodesTest() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { @@ -181,7 +179,7 @@ public class IoTDBClusterNodeGetterIT { } @Test - public void removeAndStopConfigNodeTest() throws TException, IOException, InterruptedException { + public void removeAndStopConfigNodeTest() throws Exception { TShowClusterResp showClusterResp; TSStatus status; @@ -217,7 +215,7 @@ public class IoTDBClusterNodeGetterIT { } @Test - public void queryAndRemoveDataNodeTest() throws TException, IOException, InterruptedException { + public void queryAndRemoveDataNodeTest() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java index 18f1d2f868..faf6a6ccdb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java @@ -40,7 +40,6 @@ import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.env.BaseConfig; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -48,7 +47,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -112,7 +110,7 @@ public class IoTDBClusterRegionLeaderBalancingIT { } @Test - public void testGreedyLeaderDistribution() throws IOException, InterruptedException, TException { + public void testGreedyLeaderDistribution() throws Exception { final int testConfigNodeNum = 1; final int testDataNodeNum = 3; EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum); @@ -165,7 +163,7 @@ public class IoTDBClusterRegionLeaderBalancingIT { } @Test - public void testMCFLeaderDistribution() throws IOException, InterruptedException, TException { + public void testMCFLeaderDistribution() throws Exception { final int testConfigNodeNum = 1; final int testDataNodeNum = 3; final int retryNum = 50; diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java index 8205a3e5a6..81abd2087c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java @@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; @@ -38,7 +37,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -118,8 +116,7 @@ public class IoTDBConfigNodeSwitchLeaderIT { } @Test - public void basicDataInheritIT() - throws IOException, TException, IllegalPathException, InterruptedException { + public void basicDataInheritIT() throws Exception { final String sg0 = "root.sg0"; final String sg1 = "root.sg1"; final String d00 = sg0 + ".d0.s"; diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java index d35205f274..32282d19ab 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java @@ -48,7 +48,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -113,8 +112,7 @@ public class IoTDBAutoRegionGroupExtensionIT { } @Test - public void testAutoRegionGroupExtensionPolicy() - throws IOException, InterruptedException, TException { + public void testAutoRegionGroupExtensionPolicy() throws Exception { final int retryNum = 100; diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java index effb799fbc..0d22f3982c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java @@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; @@ -41,7 +40,6 @@ import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.env.BaseConfig; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,7 +47,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -131,8 +128,7 @@ public class IoTDBCustomRegionGroupExtensionIT { } @Test - public void testCustomRegionGroupExtensionPolicy() - throws IOException, InterruptedException, TException, IllegalPathException { + public void testCustomRegionGroupExtensionPolicy() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java index c9de411582..665155e8ae 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java @@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; @@ -50,7 +49,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -60,7 +58,6 @@ import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -129,7 +126,7 @@ public class IoTDBPartitionDurableIT { setStorageGroup(); } - private void setStorageGroup() throws IOException, InterruptedException, TException { + private void setStorageGroup() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { TSetStorageGroupReq setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg)); @@ -156,8 +153,7 @@ public class IoTDBPartitionDurableIT { } @Test - public void testRemovingDataNode() - throws IOException, InterruptedException, TException, IllegalPathException { + public void testRemovingDataNode() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { @@ -283,7 +279,7 @@ public class IoTDBPartitionDurableIT { } @Test - public void testReadOnlyDataNode() throws IOException, InterruptedException, TException { + public void testReadOnlyDataNode() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { @@ -434,7 +430,7 @@ public class IoTDBPartitionDurableIT { } @Test - public void testUnknownDataNode() throws IOException, TException, InterruptedException { + public void testUnknownDataNode() throws Exception { // Shutdown a DataNode, the ConfigNode should still be able to create RegionGroup EnvFactory.getEnv().shutdownDataNode(testDataNodeId); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java index 49f7b1ce60..f50fb390d4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java @@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; @@ -54,7 +53,6 @@ import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.env.BaseConfig; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -64,7 +62,6 @@ import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -137,8 +134,7 @@ public class IoTDBPartitionGetterIT { prepareData(); } - private static void prepareData() - throws IOException, InterruptedException, TException, IllegalPathException { + private static void prepareData() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { /* Set StorageGroups */ @@ -242,8 +238,7 @@ public class IoTDBPartitionGetterIT { } @Test - public void testGetSchemaPartition() - throws TException, IOException, IllegalPathException, InterruptedException { + public void testGetSchemaPartition() throws Exception { final String sg = "root.sg"; final String sg0 = "root.sg0"; final String sg1 = "root.sg1"; @@ -306,7 +301,7 @@ public class IoTDBPartitionGetterIT { } @Test - public void testGetDataPartition() throws TException, IOException, InterruptedException { + public void testGetDataPartition() throws Exception { final int seriesPartitionBatchSize = 100; final int timePartitionBatchSize = 10; @@ -384,8 +379,7 @@ public class IoTDBPartitionGetterIT { } @Test - public void testGetSlots() - throws TException, IOException, IllegalPathException, InterruptedException { + public void testGetSlots() throws Exception { final String sg0 = "root.sg0"; final String sg1 = "root.sg1"; @@ -532,8 +526,7 @@ public class IoTDBPartitionGetterIT { } @Test - public void testGetSchemaNodeManagementPartition() - throws IOException, TException, IllegalPathException, InterruptedException { + public void testGetSchemaNodeManagementPartition() throws Exception { TSchemaNodeManagementReq nodeManagementReq; TSchemaNodeManagementResp nodeManagementResp; diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java index 6e8ae39c83..f9dd8af7e9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java @@ -36,7 +36,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -46,7 +45,6 @@ import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -127,8 +125,7 @@ public class IoTDBPartitionInheritPolicyIT { } @Test - public void testDataPartitionInheritPolicy() - throws TException, IOException, InterruptedException { + public void testDataPartitionInheritPolicy() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java index 4054cbc4fe..363decbf2d 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java @@ -51,11 +51,12 @@ public class ClientFactoryProperty { } public static class Builder { - // whether to use thrift compression + + /** whether to use thrift compression. */ private boolean rpcThriftCompressionEnabled = DefaultProperty.RPC_THRIFT_COMPRESSED_ENABLED; - // socket timeout for thrift client + /** socket timeout for thrift client. */ private int connectionTimeoutMs = DefaultProperty.CONNECTION_TIMEOUT_MS; - // number of selector threads for asynchronous thrift client in a clientManager + /** number of selector threads for asynchronous thrift client in a clientManager. */ private int selectorNumOfAsyncClientManager = DefaultProperty.SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 6c826aa22e..66836967de 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -19,14 +19,14 @@ package org.apache.iotdb.commons.client; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.commons.pool2.KeyedObjectPool; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.util.Optional; public class ClientManager<K, V> implements IClientManager<K, V> { @@ -44,60 +44,44 @@ public class ClientManager<K, V> implements IClientManager<K, V> { } @Override - public V borrowClient(K node) throws IOException { - V client; + public V borrowClient(K node) throws ClientManagerException { try { - client = pool.borrowObject(node); - } catch (TTransportException e) { - // external needs to check transport related exception - throw new IOException(e); - } catch (IOException e) { - // external needs the IOException to check connection - throw e; + return pool.borrowObject(node); } catch (Exception e) { - // external doesn't care of other exceptions - String errorMessage = - String.format( - "Borrow client from pool for node %s failed, you need to increase dn_max_connection_for_internal_service.", - node); - logger.warn(errorMessage, e); - throw new IOException(errorMessage, e); + throw new ClientManagerException(e); } - return client; } - @Override - public V purelyBorrowClient(K node) { - V client = null; - try { - client = pool.borrowObject(node); - } catch (Exception ignored) { - // Just ignore - } - return client; - } - - // return a V client of the K node to the Manager + /** + * return a client V for node K to the ClientManager Note: We do not define this interface in + * IClientManager to make you aware that the return of a client is automatic whenever a particular + * client is used. + */ public void returnClient(K node, V client) { - if (client != null && node != null) { - try { - pool.returnObject(node, client); - } catch (Exception e) { - logger.error( - String.format("Return client %s for node %s to pool failed.", client, node), e); - } - } + Optional.ofNullable(node) + .ifPresent( + x -> { + try { + pool.returnObject(node, client); + } catch (Exception e) { + logger.error( + String.format("Return client %s for node %s to pool failed.", client, node), e); + } + }); } @Override public void clear(K node) { - if (node != null) { - try { - pool.clear(node); - } catch (Exception e) { - logger.error(String.format("Clear all client in pool for node %s failed.", node), e); - } - } + Optional.ofNullable(node) + .ifPresent( + x -> { + try { + pool.clear(node); + } catch (Exception e) { + logger.error( + String.format("Clear all client in pool for node %s failed.", node), e); + } + }); } @Override @@ -105,7 +89,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> { try { pool.close(); } catch (Exception e) { - logger.error("close client pool failed", e); + logger.error("Close client pool failed", e); } } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java index c764692564..45870e9ba6 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java @@ -37,14 +37,20 @@ public class ClientPoolProperty<V> { } public static class Builder<V> { - // when the number of the client to a single node exceeds maxTotalConnectionForEachNode, the - // current thread will block waitClientTimeoutMS, ClientManager returns NULL if there are no - // clients after the block time + + /** + * when the number of the client to a single node exceeds maxTotalConnectionForEachNode, the + * current thread will block waitClientTimeoutMS, ClientManager throws ClientManagerException if + * there are no clients after the block time. + */ private long waitClientTimeoutMS = DefaultProperty.WAIT_CLIENT_TIMEOUT_MS; - // the maximum number of clients that can be applied for a node + + /** the maximum number of clients that can be allocated for a node. */ private int maxTotalClientForEachNode = DefaultProperty.MAX_TOTAL_CLIENT_FOR_EACH_NODE; - // the maximum number of clients that can be idle for a node. When the number of idle clients on - // a node exceeds this number, newly returned clients will be released + /** + * the maximum number of clients that can be idle for a node. When the number of idle clients on + * a node exceeds this number, newly returned clients will be released. + */ private int maxIdleClientForEachNode = DefaultProperty.MAX_IDLE_CLIENT_FOR_EACH_NODE; public Builder<V> setWaitClientTimeoutMS(long waitClientTimeoutMS) { diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java index 4dfaf6b362..2b74fc716f 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java @@ -19,28 +19,24 @@ package org.apache.iotdb.commons.client; -import javax.annotation.concurrent.ThreadSafe; +import org.apache.iotdb.commons.client.exception.ClientManagerException; -import java.io.IOException; +import javax.annotation.concurrent.ThreadSafe; @ThreadSafe public interface IClientManager<K, V> { - // get a V client of the K node from the Manager - V borrowClient(K node) throws IOException; + /** get a client V for node K from the IClientManager. */ + V borrowClient(K node) throws ClientManagerException; - // Get a V client of the K node from the Manager while - // no exceptions will be thrown and no logs will be printed. - // This interface is mainly used to process the cluster heartbeat. - V purelyBorrowClient(K node); - - // clear all clients for K node + /** clear all clients for node K. */ void clear(K node); - // close clientManager + /** close IClientManager, which means closing all clients for all nodes. */ void close(); class Factory<K, V> { + public IClientManager<K, V> createClientManager(IClientPoolFactory<K, V> clientPoolFactory) { return new ClientManager<>(clientPoolFactory); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java index 7e3e95a019..5f2c7a72f6 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java @@ -22,7 +22,10 @@ package org.apache.iotdb.commons.client; import org.apache.commons.pool2.KeyedObjectPool; public interface IClientPoolFactory<K, V> { - // We can implement this interface in other modules and then set the corresponding expected - // parameters and client factory classes + + /** + * We can implement this interface in other modules and then set the corresponding expected + * parameters and client factory classes. + */ KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java similarity index 70% copy from node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java copy to node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java index 7e3e95a019..439f25b655 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java @@ -17,12 +17,10 @@ * under the License. */ -package org.apache.iotdb.commons.client; +package org.apache.iotdb.commons.client.exception; -import org.apache.commons.pool2.KeyedObjectPool; - -public interface IClientPoolFactory<K, V> { - // We can implement this interface in other modules and then set the corresponding expected - // parameters and client factory classes - KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager); +public class ClientManagerException extends Exception { + public ClientManagerException(Exception exception) { + super(exception); + } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java index 5ce1746799..f7c7f05e24 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java @@ -31,7 +31,7 @@ public class ThriftServiceThread extends AbstractThriftServiceThread { /** for asynced ThriftService. */ @SuppressWarnings("squid:S107") public ThriftServiceThread( - TBaseAsyncProcessor processor, + TBaseAsyncProcessor<?> processor, String serviceName, String threadsName, String bindAddress, diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java index 05169af548..18bf4bcd38 100644 --- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java +++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.client; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.mock.MockInternalRPCService; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.exception.StartupException; @@ -34,6 +35,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; @@ -251,14 +253,15 @@ public class ClientManagerTest { // get another sync client, should wait waitClientTimeoutMS ms, throw error SyncDataNodeInternalServiceClient syncClient2 = null; - long start = 0, end; + long start = 0; try { start = System.nanoTime(); syncClient2 = syncClusterManager.borrowClient(endPoint); - } catch (IOException e) { - end = System.nanoTime(); + } catch (ClientManagerException e) { + long end = System.nanoTime(); Assert.assertTrue(end - start >= waitClientTimeoutMs * 1_000_000); - Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for node")); + Assert.assertTrue(e.getCause() instanceof NoSuchElementException); + Assert.assertTrue(e.getMessage().contains("Timeout waiting for idle object")); } Assert.assertNull(syncClient2); @@ -322,14 +325,15 @@ public class ClientManagerTest { Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint)); // get another sync client, should wait waitClientTimeoutMS ms, throw error - long start = 0, end; + long start = 0; try { start = System.nanoTime(); - syncClusterManager.borrowClient(endPoint); - } catch (IOException e) { - end = System.nanoTime(); + syncClient1 = syncClusterManager.borrowClient(endPoint); + } catch (ClientManagerException e) { + long end = System.nanoTime(); Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000); - Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for node")); + Assert.assertTrue(e.getCause() instanceof NoSuchElementException); + Assert.assertTrue(e.getMessage().contains("Timeout waiting for idle object")); } // return one sync client diff --git a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java index 8b3d441e14..a41ef93b2b 100644 --- a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.auth.entity.PathPrivilege; import org.apache.iotdb.commons.auth.entity.Role; import org.apache.iotdb.commons.auth.entity.User; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.path.PartialPath; @@ -50,7 +51,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -141,7 +141,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); future.setException(e); } catch (AuthException e) { @@ -177,7 +177,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { } else { AuthorizerManager.getInstance().buildTSBlock(authorizerResp.getAuthorizerInfo(), future); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); authorizerResp.setStatus( RpcUtils.getStatus( @@ -213,7 +213,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server status = configNodeClient.login(req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); status = new TPermissionInfoResp(); status.setStatus( @@ -240,7 +240,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server permissionInfoResp = configNodeClient.checkUserPrivileges(req); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.error("Failed to connect to config node."); permissionInfoResp = new TPermissionInfoResp(); permissionInfoResp.setStatus( diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java index ea81e4401b..19564f2528 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.metadata.template; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; @@ -41,7 +42,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -98,7 +98,7 @@ public class ClusterTemplateManager implements ITemplateManager { tsStatus); } return tsStatus; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new RuntimeException( new IoTDBException( "create template error.", e, TSStatusCode.CREATE_TEMPLATE_ERROR.getStatusCode())); @@ -140,7 +140,7 @@ public class ClusterTemplateManager implements ITemplateManager { tGetAllTemplatesResp.getStatus().getMessage(), tGetAllTemplatesResp.getStatus().getCode())); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new RuntimeException( new IoTDBException( "get all template error.", TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode())); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 943d24a6e5..50a3f2e1e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -402,7 +402,7 @@ public class SinkHandle implements ISinkHandle { mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) { client.onNewDataBlockEvent(newDataBlockEvent); break; - } catch (Throwable e) { + } catch (Exception e) { logger.warn("Failed to send new data block event, attempt times: {}", attempt, e); if (attempt == MAX_ATTEMPT_TIMES) { sinkHandleListener.onFailure(SinkHandle.this, e); @@ -442,7 +442,7 @@ public class SinkHandle implements ISinkHandle { mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) { client.onEndOfDataBlockEvent(endOfDataBlockEvent); break; - } catch (Throwable e) { + } catch (Exception e) { logger.warn("Failed to send end of data block event, attempt times: {}", attempt, e); if (attempt == MAX_ATTEMPT_TIMES) { logger.warn("Failed to send end of data block event after all retry", e); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java index 72bced5f55..b9a358c2c3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java @@ -37,9 +37,6 @@ import org.apache.iotdb.db.client.DataNodeClientPoolFactory; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; -import org.apache.thrift.TException; - -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -75,7 +72,7 @@ public class TestRPCClient { new TTriggerSnapshotLoadReq( new DataRegionId(1).convertToTConsensusGroupId(), "snapshot_1_1662370255552")); System.out.println(res.status); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -87,7 +84,7 @@ public class TestRPCClient { client.inactivatePeer( new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId())); System.out.println(res.status); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -97,7 +94,7 @@ public class TestRPCClient { INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) { client.removeRegionPeer( new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3))); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -107,7 +104,7 @@ public class TestRPCClient { INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) { client.addRegionPeer( new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3))); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -169,7 +166,7 @@ public class TestRPCClient { TSStatus res = client.createDataRegion(req); System.out.println(res.code + " " + res.message); - } catch (IOException | TException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java index 42e379beb9..2c70aaf3d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.partition.DataPartition; @@ -62,6 +63,7 @@ import java.util.List; import java.util.Map; public class ClusterPartitionFetcher implements IPartitionFetcher { + private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -74,6 +76,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory()); private static final class ClusterPartitionFetcherHolder { + private static final ClusterPartitionFetcher INSTANCE = new ClusterPartitionFetcher(); private ClusterPartitionFetcherHolder() {} @@ -115,7 +118,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { } } return schemaPartition; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { logger.warn("Get Schema Partition error", e); throw new StatementAnalyzeException( "An error occurred when executing getSchemaPartition():" + e.getMessage()); @@ -147,7 +150,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { } } return schemaPartition; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getOrCreateSchemaPartition():" + e.getMessage()); } @@ -164,7 +167,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { constructSchemaNodeManagementPartitionReq(patternTree, level)); return parseSchemaNodeManagementPartitionResp(schemaNodeManagementResp); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getSchemaNodeManagementPartition():" + e.getMessage()); } @@ -188,7 +191,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { "An error occurred when executing getDataPartition():" + dataPartitionTableResp.getStatus().getMessage()); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getDataPartition():" + e.getMessage()); } @@ -214,7 +217,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { "An error occurred when executing getDataPartition():" + dataPartitionTableResp.getStatus().getMessage()); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getDataPartition():" + e.getMessage()); } @@ -239,7 +242,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { "An error occurred when executing getOrCreateDataPartition():" + dataPartitionTableResp.getStatus().getMessage()); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getOrCreateDataPartition():" + e.getMessage()); } @@ -272,7 +275,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher { dataPartitionTableResp.getStatus().getMessage(), dataPartitionTableResp.getStatus().getCode())); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getOrCreateDataPartition():" + e.getMessage()); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java index 3b5b25967f..040c568b40 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; @@ -56,7 +57,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -184,7 +184,8 @@ public class PartitionCache { * @param devicePaths the devices that need to hit */ private void fetchStorageGroupAndUpdateCache( - StorageGroupCacheResult<?> result, List<String> devicePaths) throws IOException, TException { + StorageGroupCacheResult<?> result, List<String> devicePaths) + throws ClientManagerException, TException { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) { storageGroupCacheLock.writeLock().lock(); @@ -215,7 +216,7 @@ public class PartitionCache { */ private void createStorageGroupAndUpdateCache( StorageGroupCacheResult<?> result, List<String> devicePaths) - throws IOException, MetadataException, TException { + throws ClientManagerException, MetadataException, TException { try (ConfigNodeClient client = configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) { storageGroupCacheLock.writeLock().lock(); @@ -336,7 +337,7 @@ public class PartitionCache { throw new StatementAnalyzeException("Failed to get database Map in three attempts."); } } - } catch (TException | MetadataException | IOException e) { + } catch (TException | MetadataException | ClientManagerException e) { throw new StatementAnalyzeException( "An error occurred when executing getDeviceToStorageGroup():" + e.getMessage()); } @@ -421,7 +422,7 @@ public class PartitionCache { throw new RuntimeException( "Failed to get replicaSet of consensus group[id= " + consensusGroupId + "]"); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { throw new StatementAnalyzeException( "An error occurred when executing getRegionReplicaSet():" + e.getMessage()); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java index df90f2c977..a20a5cf3b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.exception.IoTDBException; @@ -215,7 +216,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -234,7 +235,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { TShowStorageGroupResp resp = client.showStorageGroup(storageGroupPathPattern); // build TSBlock ShowStorageGroupTask.buildTSBlock(resp.getStorageGroupInfoMap(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -253,7 +254,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { storageGroupNum = resp.getCount(); // build TSBlock CountStorageGroupTask.buildTSBlock(storageGroupNum, future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -277,7 +278,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -399,7 +400,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | IOException | TException e) { future.setException(e); } return future; @@ -418,7 +419,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -438,7 +439,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } // convert triggerTable and buildTsBlock ShowFunctionsTask.buildTsBlock(getUDFTableResp.getAllUDFInformation(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } @@ -574,7 +575,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException | IOException e) { future.setException(e); } return future; @@ -592,7 +593,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -613,7 +614,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } // convert triggerTable and buildTsBlock ShowTriggersTask.buildTsBlock(getTriggerTableResp.getAllTriggerInformation(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } @@ -641,7 +642,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -656,7 +657,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.merge(); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -679,7 +680,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.flush(tFlushReq); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -702,7 +703,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.clearCache(); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -725,7 +726,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.loadConfiguration(); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -748,7 +749,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { // Send request to some API server tsStatus = client.setSystemStatus(status.getStatus()); - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } } else { @@ -769,7 +770,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) { showClusterResp = client.showCluster(); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { if (showClusterResp.getConfigNodeList() == null) { future.setException(new TException(MSG_RECONNECTION_FAIL)); } else { @@ -815,7 +816,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } } } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -846,7 +847,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { showRegionResp.getStatus().message, showRegionResp.getStatus().code)); return future; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -868,7 +869,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { showDataNodesResp.getStatus().message, showDataNodesResp.getStatus().code)); return future; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -890,7 +891,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { showConfigNodesResp.getStatus().message, showConfigNodesResp.getStatus().code)); return future; } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } // build TSBlock @@ -1030,7 +1031,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1055,7 +1056,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1114,7 +1115,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1329,7 +1330,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1449,7 +1450,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1467,7 +1468,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } return future; @@ -1486,7 +1487,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { } // convert cqList and buildTsBlock ShowContinuousQueriesTask.buildTsBlock(showCQResp.getCqList(), future); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { future.setException(e); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java index e71f331cdf..bebb38f2b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; @@ -35,7 +36,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp; import org.apache.thrift.TException; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -70,7 +70,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT public abstract void abort(); protected FragmentInstanceInfo fetchInstanceInfo(FragmentInstance instance) - throws TException, IOException { + throws ClientManagerException, TException { TEndPoint endPoint = instance.getHostDataNode().internalEndPoint; if (isInstanceRunningLocally(endPoint)) { FragmentInstanceInfo info = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java index 613e0011ba..80b90b9a21 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; @@ -35,7 +36,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -131,7 +131,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { updateQueryState(instance.getId(), instanceInfo); } - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { // TODO: do nothing ? logger.warn("error happened while fetching query state", e); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index 1ba07ccb79..db04e97f0b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -47,7 +48,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown query type [%s]", instance.getType()))); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { logger.warn("can't connect to node {}", endPoint, e); TSStatus status = new TSStatus(); status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java index bda8205edf..14bace8dc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.QueryId; @@ -28,10 +29,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,11 +100,11 @@ public class SimpleQueryTerminator implements IQueryTerminator { try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs)); - } catch (IOException e) { + } catch (ClientManagerException e) { logger.warn("can't connect to node {}", endPoint, e); // we shouldn't return here and need to cancel queryTasks in other nodes succeed = false; - } catch (Throwable t) { + } catch (TException t) { logger.warn("cancel query {} on node {} failed.", queryId.getId(), endPoint, t); // we shouldn't return here and need to cancel queryTasks in other nodes succeed = false; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java index dc36fce471..0323f45ba3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -50,7 +51,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -135,7 +135,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { logger.warn(loadResp.message); throw new FragmentInstanceDispatchException(loadResp.status); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { logger.warn("can't connect to node {}", endPoint, e); TSStatus status = new TSStatus(); status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); @@ -223,7 +223,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { logger.warn(loadResp.message); throw new FragmentInstanceDispatchException(loadResp.status); } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { logger.warn("can't connect to node {}", endPoint, e); TSStatus status = new TSStatus(); status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()); diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java index fc1a7d056b..8d5eb5cdcf 100644 --- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.trigger.executor; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.commons.path.PartialPath; @@ -340,7 +341,7 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv Thread.sleep(4000); } } - } catch (IOException | TException e) { + } catch (ClientManagerException | TException e) { // IOException means that we failed to borrow client, possibly because corresponding // DataNode is down. // TException means there's a timeout or broken connection. @@ -353,7 +354,10 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv e); // update TDataNodeLocation of stateful trigger through config node updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId()); - } catch (Throwable e) { + } catch (InterruptedException e) { + LOGGER.warn("{} interrupted when sleep", triggerName); + Thread.currentThread().interrupt(); + } catch (Exception e) { LOGGER.warn( "Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}", triggerName, @@ -412,7 +416,7 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv } } return false; - } catch (TException | IOException e) { + } catch (ClientManagerException | TException | IOException e) { LOGGER.error( "Failed to update location of stateful trigger({}) through config node and the cause is {}.", triggerName, diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java index 029eac1ccf..57f121f40f 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener; @@ -74,7 +75,7 @@ public class SinkHandleTest { Mockito.doNothing() .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -223,7 +224,7 @@ public class SinkHandleTest { Mockito.doNothing() .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -423,7 +424,7 @@ public class SinkHandleTest { Mockito.doThrow(mockException) .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -532,7 +533,7 @@ public class SinkHandleTest { Mockito.doNothing() .when(mockClient) .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java index 7c345b5d9f..aa9adbace6 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; @@ -37,7 +38,6 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -81,7 +81,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -197,7 +197,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -351,7 +351,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -523,7 +523,7 @@ public class SourceHandleTest { Mockito.doThrow(mockException) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); } @@ -606,7 +606,7 @@ public class SourceHandleTest { }) .when(mockClient) .getDataBlock(Mockito.any(TGetDataBlockRequest.class)); - } catch (TException | IOException e) { + } catch (ClientManagerException | TException e) { e.printStackTrace(); Assert.fail(); }
