This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-6013-1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5a69816799d41d4839a25dea29a095f2833a5cfc
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jun 20 11:35:09 2023 +0800

    [IOTDB-6013] Pipe: pipe-related threads (pools) should not be initialized 
unless necessary
---
 .../manager/pipe/runtime/PipeHeartbeatParser.java  | 47 ++++++++++++----------
 .../pipe/runtime/PipeLeaderChangeHandler.java      | 14 ++++---
 .../pipe/runtime/PipeRuntimeCoordinator.java       | 22 ++++++++--
 .../pipe/connector/v2/IoTDBThriftConnectorV2.java  | 47 ++++++++++++++--------
 4 files changed, 84 insertions(+), 46 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
index 537a5e7d1cc..5e5564758dc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
@@ -90,24 +90,29 @@ public class PipeHeartbeatParser {
       return;
     }
 
-    PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
-        () -> {
-          if (!pipeMetaByteBufferListFromDataNode.isEmpty()) {
-            parseHeartbeatAndSaveMetaChangeLocally(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
-          }
-
-          if (canSubmitHandleMetaChangeProcedure.get()
-              && (needWriteConsensusOnConfigNodes.get() || 
needPushPipeMetaToDataNodes.get())) {
-            configManager
-                .getProcedureManager()
-                .pipeHandleMetaChange(
-                    needWriteConsensusOnConfigNodes.get(), 
needPushPipeMetaToDataNodes.get());
-
-            // reset flags after procedure is submitted
-            needWriteConsensusOnConfigNodes.set(false);
-            needPushPipeMetaToDataNodes.set(false);
-          }
-        });
+    configManager
+        .getPipeManager()
+        .getPipeRuntimeCoordinator()
+        .getProcedureSubmitter()
+        .submit(
+            () -> {
+              if (!pipeMetaByteBufferListFromDataNode.isEmpty()) {
+                parseHeartbeatAndSaveMetaChangeLocally(
+                    dataNodeId, pipeMetaByteBufferListFromDataNode);
+              }
+
+              if (canSubmitHandleMetaChangeProcedure.get()
+                  && (needWriteConsensusOnConfigNodes.get() || 
needPushPipeMetaToDataNodes.get())) {
+                configManager
+                    .getProcedureManager()
+                    .pipeHandleMetaChange(
+                        needWriteConsensusOnConfigNodes.get(), 
needPushPipeMetaToDataNodes.get());
+
+                // reset flags after procedure is submitted
+                needWriteConsensusOnConfigNodes.set(false);
+                needPushPipeMetaToDataNodes.set(false);
+              }
+            });
   }
 
   private void parseHeartbeatAndSaveMetaChangeLocally(
@@ -197,9 +202,9 @@ public class PipeHeartbeatParser {
               needPushPipeMetaToDataNodes.set(true);
 
               LOGGER.warn(
-                  String.format(
-                      "Detect PipeRuntimeCriticalException %s from DataNode, 
stop pipe %s.",
-                      exception, pipeName));
+                  "Detect PipeRuntimeCriticalException {} from DataNode, stop 
pipe {}.",
+                  exception,
+                  pipeName);
             }
 
             if (exception instanceof PipeRuntimeConnectorCriticalException) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
index 3f93fb022b9..1e91bd55cc4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
@@ -78,10 +78,14 @@ public class PipeLeaderChangeHandler implements 
IClusterStatusSubscriber {
     }
 
     // submit procedure in an async way to avoid blocking the caller
-    PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
-        () ->
-            configManager
-                .getProcedureManager()
-                
.pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap));
+    configManager
+        .getPipeManager()
+        .getPipeRuntimeCoordinator()
+        .getProcedureSubmitter()
+        .submit(
+            () ->
+                configManager
+                    .getProcedureManager()
+                    
.pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap));
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 23398038b55..59ed8e5056c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -31,24 +31,40 @@ import org.jetbrains.annotations.NotNull;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
 
   // shared thread pool in the runtime package
-  static final ExecutorService PROCEDURE_SUBMITTER =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName());
+  private static final AtomicReference<ExecutorService> 
procedureSubmitterHolder =
+      new AtomicReference<>();
+  private final ExecutorService procedureSubmitter;
 
   private final PipeLeaderChangeHandler pipeLeaderChangeHandler;
   private final PipeHeartbeatParser pipeHeartbeatParser;
   private final PipeMetaSyncer pipeMetaSyncer;
 
   public PipeRuntimeCoordinator(ConfigManager configManager) {
+    if (procedureSubmitterHolder.get() == null) {
+      synchronized (PipeRuntimeCoordinator.class) {
+        if (procedureSubmitterHolder.get() == null) {
+          procedureSubmitterHolder.set(
+              IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+                  ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName()));
+        }
+      }
+    }
+    procedureSubmitter = procedureSubmitterHolder.get();
+
     pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager);
     pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
     pipeMetaSyncer = new PipeMetaSyncer(configManager);
   }
 
+  public ExecutorService getProcedureSubmitter() {
+    return procedureSubmitter;
+  }
+
   @Override
   public void onClusterStatisticsChanged(StatisticsChangeEvent event) {
     pipeLeaderChangeHandler.onClusterStatisticsChanged(event);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
index edba5374f75..0e76e1d0d5c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -73,15 +73,16 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 public class IoTDBThriftConnectorV2 implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
+  private static final String FAILED_TO_BORROW_CLIENT_FORMATTER =
+      "Failed to borrow client from client pool for receiver %s:%s.";
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
-  private static final IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
-      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER =
-          new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
-              .createClientManager(
-                  new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+  private static volatile IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
+      asyncPipeDataTransferClientManagerHolder;
+  private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
+      asyncPipeDataTransferClientManager;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
@@ -92,6 +93,21 @@ public class IoTDBThriftConnectorV2 implements PipeConnector 
{
 
   private List<TEndPoint> nodeUrls;
 
+  public IoTDBThriftConnectorV2() {
+    if (asyncPipeDataTransferClientManagerHolder == null) {
+      synchronized (IoTDBThriftConnectorV2.class) {
+        if (asyncPipeDataTransferClientManagerHolder == null) {
+          asyncPipeDataTransferClientManagerHolder =
+              new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
+                  .createClientManager(
+                      new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+        }
+      }
+    }
+
+    asyncPipeDataTransferClientManager = 
asyncPipeDataTransferClientManagerHolder;
+  }
+
   public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
     commitQueue.offer(
         new Pair<>(
@@ -151,11 +167,11 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
         throw new PipeException(String.format("Handshake error, result status 
%s.", resp.status));
       }
     } catch (TException e) {
-      LOGGER.warn(
+      throw new PipeConnectionException(
           String.format(
-              "Connect to receiver %s:%s error.", firstNodeUrl.getIp(), 
firstNodeUrl.getPort()),
+              "Connect to receiver %s:%s error: %s",
+              e.getMessage(), firstNodeUrl.getIp(), firstNodeUrl.getPort()),
           e);
-      throw new PipeConnectionException(e.getMessage(), e);
     }
   }
 
@@ -204,7 +220,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client =
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
 
       try {
         pipeTransferInsertNodeReqHandler.transfer(client);
@@ -219,8 +235,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
       pipeTransferInsertNodeReqHandler.onError(ex);
       LOGGER.warn(
           String.format(
-              "Failed to borrow client from client pool for receiver %s:%s.",
-              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
     }
   }
@@ -232,7 +247,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client =
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
 
       try {
         pipeTransferTabletReqHandler.transfer(client);
@@ -247,8 +262,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
       pipeTransferTabletReqHandler.onError(ex);
       LOGGER.warn(
           String.format(
-              "Failed to borrow client from client pool for receiver %s:%s.",
-              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
     }
   }
@@ -278,7 +292,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client =
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
 
       try {
         pipeTransferTsFileInsertionEventHandler.transfer(client);
@@ -293,8 +307,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
       pipeTransferTsFileInsertionEventHandler.onError(ex);
       LOGGER.warn(
           String.format(
-              "Failed to borrow client from client pool for receiver %s:%s.",
-              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
     }
   }

Reply via email to