This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-6013-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a69816799d41d4839a25dea29a095f2833a5cfc Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Jun 20 11:35:09 2023 +0800 [IOTDB-6013] Pipe: pipe-related threads (pools) should not be initialized unless necessary --- .../manager/pipe/runtime/PipeHeartbeatParser.java | 47 ++++++++++++---------- .../pipe/runtime/PipeLeaderChangeHandler.java | 14 ++++--- .../pipe/runtime/PipeRuntimeCoordinator.java | 22 ++++++++-- .../pipe/connector/v2/IoTDBThriftConnectorV2.java | 47 ++++++++++++++-------- 4 files changed, 84 insertions(+), 46 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java index 537a5e7d1cc..5e5564758dc 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java @@ -90,24 +90,29 @@ public class PipeHeartbeatParser { return; } - PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit( - () -> { - if (!pipeMetaByteBufferListFromDataNode.isEmpty()) { - parseHeartbeatAndSaveMetaChangeLocally(dataNodeId, pipeMetaByteBufferListFromDataNode); - } - - if (canSubmitHandleMetaChangeProcedure.get() - && (needWriteConsensusOnConfigNodes.get() || needPushPipeMetaToDataNodes.get())) { - configManager - .getProcedureManager() - .pipeHandleMetaChange( - needWriteConsensusOnConfigNodes.get(), needPushPipeMetaToDataNodes.get()); - - // reset flags after procedure is submitted - needWriteConsensusOnConfigNodes.set(false); - needPushPipeMetaToDataNodes.set(false); - } - }); + configManager + .getPipeManager() + .getPipeRuntimeCoordinator() + .getProcedureSubmitter() + .submit( + () -> { + if (!pipeMetaByteBufferListFromDataNode.isEmpty()) { + parseHeartbeatAndSaveMetaChangeLocally( + dataNodeId, pipeMetaByteBufferListFromDataNode); + } + + if (canSubmitHandleMetaChangeProcedure.get() + && (needWriteConsensusOnConfigNodes.get() || needPushPipeMetaToDataNodes.get())) { + configManager + .getProcedureManager() + .pipeHandleMetaChange( + needWriteConsensusOnConfigNodes.get(), needPushPipeMetaToDataNodes.get()); + + // reset flags after procedure is submitted + needWriteConsensusOnConfigNodes.set(false); + needPushPipeMetaToDataNodes.set(false); + } + }); } private void parseHeartbeatAndSaveMetaChangeLocally( @@ -197,9 +202,9 @@ public class PipeHeartbeatParser { needPushPipeMetaToDataNodes.set(true); LOGGER.warn( - String.format( - "Detect PipeRuntimeCriticalException %s from DataNode, stop pipe %s.", - exception, pipeName)); + "Detect PipeRuntimeCriticalException {} from DataNode, stop pipe {}.", + exception, + pipeName); } if (exception instanceof PipeRuntimeConnectorCriticalException) { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java index 3f93fb022b9..1e91bd55cc4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java @@ -78,10 +78,14 @@ public class PipeLeaderChangeHandler implements IClusterStatusSubscriber { } // submit procedure in an async way to avoid blocking the caller - PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit( - () -> - configManager - .getProcedureManager() - .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap)); + configManager + .getPipeManager() + .getPipeRuntimeCoordinator() + .getProcedureSubmitter() + .submit( + () -> + configManager + .getProcedureManager() + .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap)); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java index 23398038b55..59ed8e5056c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java @@ -31,24 +31,40 @@ import org.jetbrains.annotations.NotNull; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { // shared thread pool in the runtime package - static final ExecutorService PROCEDURE_SUBMITTER = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName()); + private static final AtomicReference<ExecutorService> procedureSubmitterHolder = + new AtomicReference<>(); + private final ExecutorService procedureSubmitter; private final PipeLeaderChangeHandler pipeLeaderChangeHandler; private final PipeHeartbeatParser pipeHeartbeatParser; private final PipeMetaSyncer pipeMetaSyncer; public PipeRuntimeCoordinator(ConfigManager configManager) { + if (procedureSubmitterHolder.get() == null) { + synchronized (PipeRuntimeCoordinator.class) { + if (procedureSubmitterHolder.get() == null) { + procedureSubmitterHolder.set( + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName())); + } + } + } + procedureSubmitter = procedureSubmitterHolder.get(); + pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager); pipeHeartbeatParser = new PipeHeartbeatParser(configManager); pipeMetaSyncer = new PipeMetaSyncer(configManager); } + public ExecutorService getProcedureSubmitter() { + return procedureSubmitter; + } + @Override public void onClusterStatisticsChanged(StatisticsChangeEvent event) { pipeLeaderChangeHandler.onClusterStatisticsChanged(event); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java index edba5374f75..0e76e1d0d5c 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java @@ -73,15 +73,16 @@ import static org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON public class IoTDBThriftConnectorV2 implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV2.class); + private static final String FAILED_TO_BORROW_CLIENT_FORMATTER = + "Failed to borrow client from client pool for receiver %s:%s."; private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private static final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> - ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER = - new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>() - .createClientManager( - new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()); + private static volatile IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> + asyncPipeDataTransferClientManagerHolder; + private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> + asyncPipeDataTransferClientManager; private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -92,6 +93,21 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { private List<TEndPoint> nodeUrls; + public IoTDBThriftConnectorV2() { + if (asyncPipeDataTransferClientManagerHolder == null) { + synchronized (IoTDBThriftConnectorV2.class) { + if (asyncPipeDataTransferClientManagerHolder == null) { + asyncPipeDataTransferClientManagerHolder = + new IClientManager.Factory<TEndPoint, AsyncPipeDataTransferServiceClient>() + .createClientManager( + new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()); + } + } + } + + asyncPipeDataTransferClientManager = asyncPipeDataTransferClientManagerHolder; + } + public synchronized void commit(long requestCommitId, @Nullable EnrichedEvent enrichedEvent) { commitQueue.offer( new Pair<>( @@ -151,11 +167,11 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { throw new PipeException(String.format("Handshake error, result status %s.", resp.status)); } } catch (TException e) { - LOGGER.warn( + throw new PipeConnectionException( String.format( - "Connect to receiver %s:%s error.", firstNodeUrl.getIp(), firstNodeUrl.getPort()), + "Connect to receiver %s:%s error: %s", + e.getMessage(), firstNodeUrl.getIp(), firstNodeUrl.getPort()), e); - throw new PipeConnectionException(e.getMessage(), e); } } @@ -204,7 +220,7 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { try { final AsyncPipeDataTransferServiceClient client = - ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl); + asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl); try { pipeTransferInsertNodeReqHandler.transfer(client); @@ -219,8 +235,7 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { pipeTransferInsertNodeReqHandler.onError(ex); LOGGER.warn( String.format( - "Failed to borrow client from client pool for receiver %s:%s.", - targetNodeUrl.getIp(), targetNodeUrl.getPort()), + FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()), ex); } } @@ -232,7 +247,7 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { try { final AsyncPipeDataTransferServiceClient client = - ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl); + asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl); try { pipeTransferTabletReqHandler.transfer(client); @@ -247,8 +262,7 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { pipeTransferTabletReqHandler.onError(ex); LOGGER.warn( String.format( - "Failed to borrow client from client pool for receiver %s:%s.", - targetNodeUrl.getIp(), targetNodeUrl.getPort()), + FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()), ex); } } @@ -278,7 +292,7 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { try { final AsyncPipeDataTransferServiceClient client = - ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl); + asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl); try { pipeTransferTsFileInsertionEventHandler.transfer(client); @@ -293,8 +307,7 @@ public class IoTDBThriftConnectorV2 implements PipeConnector { pipeTransferTsFileInsertionEventHandler.onError(ex); LOGGER.warn( String.format( - "Failed to borrow client from client pool for receiver %s:%s.", - targetNodeUrl.getIp(), targetNodeUrl.getPort()), + FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), targetNodeUrl.getPort()), ex); } }
