This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new efdd472b7d1 Fix pipe runtime meta push blocking user operations 
(#17909) (#17953)
efdd472b7d1 is described below

commit efdd472b7d103a05ea1796ac05feb0fffe35881f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:01:47 2026 +0800

    Fix pipe runtime meta push blocking user operations (#17909) (#17953)
    
    * Fix pipe runtime meta push blocking user operations
    
    * Fix subscription runtime push blocking
    
    * Harden runtime meta sync best effort
    
    * Bound required metadata push waits
    
    * Fix async request dispatch failure hang
    
    * Harden async request failure handling
    
    * Use safe time conversion for runtime meta sync intervals
    
    * Harden subscription runtime meta sync timeouts
    
    * Bound pipe heartbeat metadata collection waits
    
    * Harden best-effort runtime metadata requests
    
    * Fix heartbeat client pool
    
    * Add tests and use real fail message & status
    
    (cherry picked from commit 1df19137839b3e813f1441071469c0a65afe2844)
---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  12 +-
 .../java/org/apache/iotdb/rpc/RpcUtilsTest.java    |  25 +++
 .../async/AsyncAINodeHeartbeatClientPool.java      |  27 ++-
 .../async/AsyncConfigNodeHeartbeatClientPool.java  |  27 ++-
 .../async/AsyncDataNodeHeartbeatClientPool.java    |  33 ++-
 .../rpc/CheckTimeSeriesExistenceRPCHandler.java    |   2 +-
 .../rpc/CountPathsUsingTemplateRPCHandler.java     |   2 +-
 .../rpc/FetchSchemaBlackListRPCHandler.java        |   2 +-
 .../async/handlers/rpc/SchemaUpdateRPCHandler.java |   2 +-
 .../CheckSchemaRegionUsingTemplateRPCHandler.java  |   2 +-
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  13 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      | 236 +++++++++++++++++----
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  22 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |   2 +-
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |   6 +-
 .../AbstractOperateSubscriptionProcedure.java      |  20 ++
 .../runtime/ConsumerGroupMetaSyncProcedure.java    |  10 +-
 .../topic/runtime/TopicMetaSyncProcedure.java      |   9 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  20 +-
 .../client/request/AsyncRequestManager.java        |  45 +++-
 .../client/request/AsyncRequestManagerTest.java    | 221 +++++++++++++++++++
 22 files changed, 652 insertions(+), 88 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 5670f80f247..88da1ab8cb3 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -122,8 +122,7 @@ public class RpcUtils {
   }
 
   public static void verifySuccess(List<TSStatus> statuses) throws 
BatchExecutionException {
-    StringBuilder errMsgs =
-        new 
StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": 
");
+    StringBuilder errMsgs = new StringBuilder();
     for (TSStatus status : statuses) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -131,7 +130,8 @@ public class RpcUtils {
       }
     }
     if (errMsgs.length() > 0) {
-      throw new BatchExecutionException(statuses, errMsgs.toString());
+      throw new BatchExecutionException(
+          statuses, TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ": " + 
errMsgs);
     }
   }
 
@@ -152,9 +152,9 @@ public class RpcUtils {
       for (TSStatus subStatus : statusList) {
         if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && subStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          if (!msgSet.contains(status)) {
-            errMsg.append(status).append("; ");
-            msgSet.add(status);
+          if (!msgSet.contains(subStatus)) {
+            errMsg.append(subStatus).append("; ");
+            msgSet.add(subStatus);
           }
         }
       }
diff --git 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
index 12969e96904..b785ca83530 100644
--- 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
+++ 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
@@ -19,11 +19,15 @@
 
 package org.apache.iotdb.rpc;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
 
 public class RpcUtilsTest {
 
@@ -64,4 +68,25 @@ public class RpcUtilsTest {
         "1970-01-01T07:59:59.999+08:00",
         RpcUtils.parseLongToDateWithPrecision(formatter, -1, zoneId, "ms"));
   }
+
+  @Test
+  public void testVerifySuccessListAllowsSuccessfulStatuses() throws 
BatchExecutionException {
+    RpcUtils.verifySuccess(
+        Arrays.asList(
+            RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+            RpcUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)));
+  }
+
+  @Test
+  public void testVerifySuccessListThrowsOnFailure() {
+    TSStatus failedStatus = 
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "failed");
+
+    try {
+      RpcUtils.verifySuccess(Collections.singletonList(failedStatus));
+      Assert.fail("Expected BatchExecutionException");
+    } catch (BatchExecutionException e) {
+      Assert.assertEquals(Collections.singletonList(failedStatus), 
e.getStatusList());
+      Assert.assertTrue(e.getMessage().contains("failed"));
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index 8ec455142b8..fa1d9adcab3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient;
@@ -41,10 +42,30 @@ public class AsyncAINodeHeartbeatClientPool {
 
   public void getAINodeHeartBeat(
       TEndPoint endPoint, TAIHeartbeatReq req, AINodeHeartbeatHandler handler) 
{
+    AsyncAINodeServiceClient client = null;
+    boolean dispatched = false;
     try {
-      clientManager.borrowClient(endPoint).getAIHeartbeat(req, handler);
-    } catch (Exception ignore) {
-      // Just ignore
+      client = clientManager.borrowClient(endPoint);
+      client.getAIHeartbeat(req, handler);
+      dispatched = true;
+    } catch (Exception e) {
+      handleError(handler, e);
+    } finally {
+      // After the async call is dispatched, the client's onComplete/onError 
callback is
+      // responsible for returning the client. If the RPC was not dispatched 
(exception
+      // before/during the call), the client must be returned here to prevent 
pool leakage.
+      if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+        ((ClientManager<TEndPoint, AsyncAINodeServiceClient>) clientManager)
+            .returnClient(endPoint, client);
+      }
+    }
+  }
+
+  private void handleError(final AINodeHeartbeatHandler handler, final 
Exception e) {
+    try {
+      handler.onError(e);
+    } catch (final Exception ignore) {
+      // Ignore handler failures in heartbeat best-effort path.
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index a6dffbe0eef..ab30935753b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
@@ -48,10 +49,30 @@ public class AsyncConfigNodeHeartbeatClientPool {
       TEndPoint endPoint,
       TConfigNodeHeartbeatReq heartbeatReq,
       ConfigNodeHeartbeatHandler handler) {
+    AsyncConfigNodeInternalServiceClient client = null;
+    boolean dispatched = false;
     try {
-      
clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(heartbeatReq, 
handler);
-    } catch (Exception ignore) {
-      // Just ignore
+      client = clientManager.borrowClient(endPoint);
+      client.getConfigNodeHeartBeat(heartbeatReq, handler);
+      dispatched = true;
+    } catch (Exception e) {
+      handleError(handler, e);
+    } finally {
+      // After the async call is dispatched, the client's onComplete/onError 
callback is
+      // responsible for returning the client. If the RPC was not dispatched 
(exception
+      // before/during the call), the client must be returned here to prevent 
pool leakage.
+      if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+        ((ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>) 
clientManager)
+            .returnClient(endPoint, client);
+      }
+    }
+  }
+
+  private void handleError(final ConfigNodeHeartbeatHandler handler, final 
Exception e) {
+    try {
+      handler.onError(e);
+    } catch (final Exception ignore) {
+      // Ignore handler failures in heartbeat best-effort path.
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index 18a8120a9e4..60c43a0ae71 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -47,14 +48,38 @@ public class AsyncDataNodeHeartbeatClientPool {
    */
   public void getDataNodeHeartBeat(
       TEndPoint endPoint, TDataNodeHeartbeatReq req, DataNodeHeartbeatHandler 
handler) {
+    AsyncDataNodeInternalServiceClient client = null;
+    boolean dispatched = false;
     try {
-      clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler);
-    } catch (Exception ignore) {
-      // Just ignore
+      client = clientManager.borrowClient(endPoint);
+      client.getDataNodeHeartBeat(req, handler);
+      dispatched = true;
+    } catch (Exception e) {
+      handleError(handler, e);
+    } finally {
+      returnClientIfNotDispatched(endPoint, client, dispatched);
+    }
+  }
+
+  // After the async call is dispatched, the client's onComplete/onError 
callback is responsible
+  // for returning the client. If the RPC was not dispatched (exception 
before/during the call),
+  // the client must be returned here to prevent pool leakage.
+  private void returnClientIfNotDispatched(
+      TEndPoint endPoint, AsyncDataNodeInternalServiceClient client, boolean 
dispatched) {
+    if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+      ((ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>) 
clientManager)
+          .returnClient(endPoint, client);
+    }
+  }
+
+  private void handleError(final DataNodeHeartbeatHandler handler, final 
Exception e) {
+    try {
+      handler.onError(e);
+    } catch (final Exception ignore) {
+      // Ignore handler failures in heartbeat best-effort path.
     }
   }
 
-  // TODO: Is the AsyncDataNodeHeartbeatClientPool must be a singleton?
   private static class AsyncDataNodeHeartbeatClientPoolHolder {
 
     private static final AsyncDataNodeHeartbeatClientPool INSTANCE =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
index 3a735691a0e..05f480062cd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
@@ -75,11 +75,11 @@ public class CheckTimeSeriesExistenceRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
index b27c74bb41d..8089e2aa524 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
@@ -75,11 +75,11 @@ public class CountPathsUsingTemplateRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
index 693017ec02d..07ce15fd0fb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
@@ -77,11 +77,11 @@ public class FetchSchemaBlackListRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
index dc2796a232e..0ca64f51fbb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
@@ -75,10 +75,10 @@ public class SchemaUpdateRPCHandler extends 
DataNodeTSStatusRPCHandler {
             + e.getMessage();
     LOGGER.warn(errorMsg);
 
-    countDownLatch.countDown();
     responseMap.put(
         requestId,
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
index 249e8b51767..1c2f77ef520 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
@@ -78,11 +78,11 @@ public class CheckSchemaRegionUsingTemplateRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TCheckSchemaRegionUsingTemplateResp resp = new 
TCheckSchemaRegionUsingTemplateResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 8f864b4d5c2..d4611bd2f2e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -50,6 +50,7 @@ public class PipeHeartbeatScheduler {
       PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
   private static final long HEARTBEAT_INTERVAL_SECONDS =
       
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
+  private static final int PIPE_HEARTBEAT_RETRY_NUM = 1;
 
   private static final ScheduledExecutorService HEARTBEAT_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -99,12 +100,8 @@ public class PipeHeartbeatScheduler {
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_HEARTBEAT, request, 
dataNodeLocationMap);
     CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
-                * 1000L
-                * 2
-                / 3);
+        .sendAsyncRequest(
+            clientHandler, PIPE_HEARTBEAT_RETRY_NUM, 
getPipeHeartbeatRequestTimeoutInMs(), true);
     clientHandler
         .getResponseMap()
         .forEach(
@@ -133,6 +130,10 @@ public class PipeHeartbeatScheduler {
     }
   }
 
+  private static long getPipeHeartbeatRequestTimeoutInMs() {
+    return TimeUnit.SECONDS.toMillis(HEARTBEAT_INTERVAL_SECONDS) * 2 / 3;
+  }
+
   public synchronized void stop() {
     if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
       heartbeatFuture.cancel(false);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 6914f8aacf5..a96d12af11c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
@@ -107,6 +108,8 @@ public class ConfigNodeProcedureEnv {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ConfigNodeProcedureEnv.class);
 
+  private static final int RUNTIME_META_PUSH_RETRY_NUM = 1;
+
   /** Add or remove node lock. */
   private final LockQueue nodeLock = new LockQueue();
 
@@ -687,10 +690,23 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp> 
clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushPipeMetaResp> pushAllPipeMetaToDataNodesBestEffort(
+      List<ByteBuffer> pipeMetaBinaryList) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushPipeMetaReq request = new 
TPushPipeMetaReq().setPipeMetas(pipeMetaBinaryList);
+
+    final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp> 
clientHandler =
+        new DataNodeAsyncRequestContext<>(
+            CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request, 
dataNodeLocationMap);
+    final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -702,10 +718,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -718,10 +733,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -735,10 +749,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -751,10 +764,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -767,10 +779,23 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp> 
clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushTopicMetaResp> 
pushAllTopicMetaToDataNodesBestEffort(
+      List<ByteBuffer> topicMetaBinaryList) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushTopicMetaReq request = new 
TPushTopicMetaReq().setTopicMetas(topicMetaBinaryList);
+
+    final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp> 
clientHandler =
+        new DataNodeAsyncRequestContext<>(
+            CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request, 
dataNodeLocationMap);
+    final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -782,7 +807,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushTopicMetaResp::getStatus)
         .collect(Collectors.toList());
@@ -797,7 +824,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushTopicMetaResp::getStatus)
         .collect(Collectors.toList());
@@ -813,10 +842,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -829,10 +857,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -847,10 +874,25 @@ public class ConfigNodeProcedureEnv {
         clientHandler =
             new DataNodeAsyncRequestContext<>(
                 CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushConsumerGroupMetaResp> 
pushAllConsumerGroupMetaToDataNodesBestEffort(
+      List<ByteBuffer> consumerGroupMetaBinaryList) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushConsumerGroupMetaReq request =
+        new 
TPushConsumerGroupMetaReq().setConsumerGroupMetas(consumerGroupMetaBinaryList);
+
+    final DataNodeAsyncRequestContext<TPushConsumerGroupMetaReq, 
TPushConsumerGroupMetaResp>
+        clientHandler =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request, 
dataNodeLocationMap);
+    final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -866,7 +908,9 @@ public class ConfigNodeProcedureEnv {
                 CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
                 request,
                 dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushConsumerGroupMetaResp::getStatus)
         .collect(Collectors.toList());
@@ -884,12 +928,124 @@ public class ConfigNodeProcedureEnv {
                 CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
                 request,
                 dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushConsumerGroupMetaResp::getStatus)
         .collect(Collectors.toList());
   }
 
+  private static long sendBestEffortRuntimeMetaRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler) {
+    final long timeoutInMs = getRuntimeMetaPushTimeoutInMs();
+    sendRuntimeMetaRequest(clientHandler, true, timeoutInMs);
+    return timeoutInMs;
+  }
+
+  private static void sendRequiredMetadataRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler, final long 
timeoutInMs) {
+    sendRuntimeMetaRequest(clientHandler, false, timeoutInMs);
+  }
+
+  private static void sendRuntimeMetaRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler,
+      final boolean keepSilent,
+      final long timeoutInMs) {
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequest(clientHandler, RUNTIME_META_PUSH_RETRY_NUM, 
timeoutInMs, keepSilent);
+  }
+
+  private static void fillMissingPipePushMetaResponses(
+      final DataNodeAsyncRequestContext<?, TPushPipeMetaResp> clientHandler,
+      final long timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        new TPushPipeMetaResp(
+                            createMetadataTimeoutStatus(
+                                clientHandler.getRequestType(),
+                                dataNodeId,
+                                TSStatusCode.PIPE_PUSH_META_ERROR,
+                                timeoutInMs))));
+  }
+
+  private static void fillMissingTopicPushMetaResponses(
+      final DataNodeAsyncRequestContext<?, TPushTopicMetaResp> clientHandler,
+      final long timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        new TPushTopicMetaResp(
+                            createMetadataTimeoutStatus(
+                                clientHandler.getRequestType(),
+                                dataNodeId,
+                                TSStatusCode.TOPIC_PUSH_META_ERROR,
+                                timeoutInMs))));
+  }
+
+  private static void fillMissingConsumerGroupPushMetaResponses(
+      final DataNodeAsyncRequestContext<?, TPushConsumerGroupMetaResp> 
clientHandler,
+      final long timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        new TPushConsumerGroupMetaResp(
+                            createMetadataTimeoutStatus(
+                                clientHandler.getRequestType(),
+                                dataNodeId,
+                                TSStatusCode.CONSUMER_PUSH_META_ERROR,
+                                timeoutInMs))));
+  }
+
+  private static TSStatus createMetadataTimeoutStatus(
+      final CnToDnAsyncRequestType requestType,
+      final int dataNodeId,
+      final TSStatusCode statusCode,
+      final long timeoutInMs) {
+    return new TSStatus(statusCode.getStatusCode())
+        .setMessage(
+            String.format(
+                "Failed to %s on DataNode %s before metadata request timeout 
%sms",
+                requestType, dataNodeId, timeoutInMs));
+  }
+
+  private static long getRuntimeMetaPushTimeoutInMs() {
+    return TimeUnit.SECONDS.toMillis(
+            
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta())
+        * 2
+        / 3;
+  }
+
+  private static long getRequiredPipeMetadataRequestTimeoutInMs() {
+    return TimeUnit.MINUTES.toMillis(
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+        * 2
+        / 3;
+  }
+
+  private static long getRequiredSubscriptionMetadataRequestTimeoutInMs() {
+    return TimeUnit.MINUTES.toMillis(
+            
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+        * 2
+        / 3;
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 857d5733f6a..2fa0091568f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -464,10 +464,11 @@ public abstract class AbstractOperatePipeProcedureV2
 
       if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) {
         if (!resp.isSetExceptionMessages()) {
+          final String statusMessage = resp.getStatus().getMessage();
           exceptionMessageBuilder.append(
               String.format(
-                  "DataNodeId: %s, Message: Internal error while processing 
pushPipeMeta on dataNodes.",
-                  dataNodeId));
+                  "DataNodeId: %s, Message: Internal error while processing 
pushPipeMeta on dataNodes.%s",
+                  dataNodeId, statusMessage == null ? "" : " " + 
statusMessage));
           continue;
         }
 
@@ -515,6 +516,23 @@ public abstract class AbstractOperatePipeProcedureV2
     }
   }
 
+  protected Map<Integer, TPushPipeMetaResp> 
pushPipeMetaToDataNodesBestEffortAndGetResponse(
+      ConfigNodeProcedureEnv env) throws IOException {
+    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+    for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
+      pipeMetaBinaryList.add(pipeMeta.serialize());
+    }
+    return env.pushAllPipeMetaToDataNodesBestEffort(pipeMetaBinaryList);
+  }
+
+  protected void pushPipeMetaToDataNodesBestEffort(ConfigNodeProcedureEnv env) 
{
+    try {
+      pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
+    } catch (Exception e) {
+      LOGGER.info("Failed to push pipe meta list to data nodes, will retry 
later.", e);
+    }
+  }
+
   /**
    * Pushing one pipeMeta to all the dataNodes, forcing an update to the 
pipe's runtime state.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 61f6f3cae2a..300d04ee11b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -111,7 +111,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleLeaderChangeProcedure: 
executeFromHandleOnDataNodes");
 
-    pushPipeMetaToDataNodesIgnoreException(env);
+    pushPipeMetaToDataNodesBestEffort(env);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 401859f0a7e..3089b3dd072 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -128,7 +128,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
       return;
     }
 
-    pushPipeMetaToDataNodesIgnoreException(env);
+    pushPipeMetaToDataNodesBestEffort(env);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 282b540b3aa..9764a3af3a5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -50,7 +51,8 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
 
   private static final long MIN_EXECUTION_INTERVAL_MS =
-      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+      
TimeUnit.MINUTES.toMillis(PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+          / 2;
   // No need to serialize this field
   private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
 
@@ -134,7 +136,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.debug("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
 
-    Map<Integer, TPushPipeMetaResp> respMap = pushPipeMetaToDataNodes(env);
+    Map<Integer, TPushPipeMetaResp> respMap = 
pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
     if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) {
       throw new PipeException(
           String.format(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 0b246ac4ef7..8f7c17f9ca7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -332,6 +332,16 @@ public abstract class AbstractOperateSubscriptionProcedure
     return env.pushAllTopicMetaToDataNodes(topicMetaBinaryList);
   }
 
+  protected Map<Integer, TPushTopicMetaResp> 
pushTopicMetaToDataNodesBestEffort(
+      ConfigNodeProcedureEnv env) throws IOException {
+    final List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
+    for (TopicMeta topicMeta : subscriptionInfo.get().getAllTopicMeta()) {
+      topicMetaBinaryList.add(topicMeta.serialize());
+    }
+
+    return env.pushAllTopicMetaToDataNodesBestEffort(topicMetaBinaryList);
+  }
+
   public static boolean pushTopicMetaHasException(Map<Integer, 
TPushTopicMetaResp> respMap) {
     for (TPushTopicMetaResp resp : respMap.values()) {
       if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -358,6 +368,16 @@ public abstract class AbstractOperateSubscriptionProcedure
     return 
env.pushAllConsumerGroupMetaToDataNodes(consumerGroupMetaBinaryList);
   }
 
+  protected Map<Integer, TPushConsumerGroupMetaResp> 
pushConsumerGroupMetaToDataNodesBestEffort(
+      ConfigNodeProcedureEnv env) throws IOException {
+    final List<ByteBuffer> consumerGroupMetaBinaryList = new ArrayList<>();
+    for (ConsumerGroupMeta consumerGroupMeta : 
subscriptionInfo.get().getAllConsumerGroupMeta()) {
+      consumerGroupMetaBinaryList.add(consumerGroupMeta.serialize());
+    }
+
+    return 
env.pushAllConsumerGroupMetaToDataNodesBestEffort(consumerGroupMetaBinaryList);
+  }
+
   public static boolean pushConsumerGroupMetaHasException(
       Map<Integer, TPushConsumerGroupMetaResp> respMap) {
     for (TPushConsumerGroupMetaResp resp : respMap.values()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 93eb6c5a5fc..0f6b5b231d2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -20,7 +20,7 @@
 package 
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
 import 
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
 import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -51,7 +52,9 @@ public class ConsumerGroupMetaSyncProcedure extends 
AbstractOperateSubscriptionP
       LoggerFactory.getLogger(ConsumerGroupMetaSyncProcedure.class);
 
   private static final long MIN_EXECUTION_INTERVAL_MS =
-      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+      TimeUnit.MINUTES.toMillis(
+              
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+          / 2;
   // No need to serialize this field
   private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
 
@@ -127,7 +130,8 @@ public class ConsumerGroupMetaSyncProcedure extends 
AbstractOperateSubscriptionP
       throws SubscriptionException, IOException {
     LOGGER.info("ConsumerGroupMetaSyncProcedure: 
executeFromOperateOnDataNodes");
 
-    Map<Integer, TPushConsumerGroupMetaResp> respMap = 
pushConsumerGroupMetaToDataNodes(env);
+    Map<Integer, TPushConsumerGroupMetaResp> respMap =
+        pushConsumerGroupMetaToDataNodesBestEffort(env);
     if (pushConsumerGroupMetaHasException(respMap)) {
       throw new SubscriptionException(
           String.format("Failed to push consumer group meta to dataNodes, 
details: %s", respMap));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index 27919bbcb9e..da94ca0ec72 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import 
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
 import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -50,7 +51,9 @@ public class TopicMetaSyncProcedure extends 
AbstractOperateSubscriptionProcedure
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TopicMetaSyncProcedure.class);
 
   private static final long MIN_EXECUTION_INTERVAL_MS =
-      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+      TimeUnit.MINUTES.toMillis(
+              
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+          / 2;
   // No need to serialize this field
   private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
 
@@ -126,7 +129,7 @@ public class TopicMetaSyncProcedure extends 
AbstractOperateSubscriptionProcedure
       throws SubscriptionException, IOException {
     LOGGER.info("TopicMetaSyncProcedure: executeFromOperateOnDataNodes");
 
-    Map<Integer, TPushTopicMetaResp> respMap = pushTopicMetaToDataNodes(env);
+    Map<Integer, TPushTopicMetaResp> respMap = 
pushTopicMetaToDataNodesBestEffort(env);
     if (pushTopicMetaHasException(respMap)) {
       throw new SubscriptionException(
           String.format("Failed to push topic meta to dataNodes, details: %s", 
respMap));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f5c84a2375f..fcaef4baa30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -327,6 +327,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   private final DataNodeThrottleQuotaManager throttleQuotaManager =
       DataNodeThrottleQuotaManager.getInstance();
 
+  private static final long TEST_CONNECTION_TIMEOUT_MS =
+      CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+  private static final int TEST_CONNECTION_RETRY_NUM = 1;
+
   private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private final ExecutorService schemaExecutor =
@@ -1613,7 +1617,9 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.ConfigNodeInternalService,
         DnToCnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToCnRequestType, 
TConfigNodeLocation> handler) ->
-            
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DnToCnInternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
@@ -1625,7 +1631,9 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeInternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DnToDnInternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -1637,7 +1645,9 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeMPPService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DataNodeMPPServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -1649,7 +1659,9 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeExternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DataNodeExternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 0290d33d3a4..45a9d6f4817 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.client.request;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.utils.function.CheckedTriConsumer;
@@ -104,6 +105,14 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
       AsyncRequestContext<?, ?, RequestType, NodeLocation> requestContext,
       int retryNum,
       Long timeoutInMs) {
+    sendAsyncRequest(requestContext, retryNum, timeoutInMs, false);
+  }
+
+  public void sendAsyncRequest(
+      AsyncRequestContext<?, ?, RequestType, NodeLocation> requestContext,
+      int retryNum,
+      Long timeoutInMs,
+      boolean keepSilent) {
     if (requestContext.getRequestIndices().isEmpty()) {
       return;
     }
@@ -143,7 +152,7 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
       }
     }
 
-    if (!requestContext.getRequestIndices().isEmpty()) {
+    if (!requestContext.getRequestIndices().isEmpty() && !keepSilent) {
       LOGGER.warn(
           "Failed to {} on ConfigNode after {} retries, requestIndices: {}",
           requestType,
@@ -157,6 +166,10 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
       int requestId,
       NodeLocation targetNode,
       int retryCount) {
+    final TEndPoint endPoint = nodeLocationToEndPoint(targetNode);
+    Client client = null;
+    boolean dispatched = false;
+    AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler = null;
     try {
       if (!actionMap.containsKey(requestContext.getRequestType())) {
         throw new UnsupportedOperationException(
@@ -164,20 +177,42 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
                 + requestContext.getRequestType()
                 + ", please set it in 
AsyncRequestManager::initActionMapBuilder()");
       }
-      Client client = 
clientManager.borrowClient(nodeLocationToEndPoint(targetNode));
+      handler = buildHandler(requestContext, requestId, targetNode);
+      client = clientManager.borrowClient(endPoint);
       adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
       Object req = requestContext.getRequest(requestId);
-      AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
-          buildHandler(requestContext, requestId, targetNode);
       Objects.requireNonNull(actionMap.get(requestContext.getRequestType()))
           .accept(req, client, handler);
+      // After accept() returns, the async callback (onComplete/onError) takes 
over the
+      // responsibility of returning the client to the pool. Before this 
point, if any exception
+      // is thrown, the client must be returned/invalidated here to prevent 
pool leakage.
+      dispatched = true;
     } catch (Exception e) {
       LOGGER.warn(
           "{} failed on Node {}, because {}, retrying {}...",
           requestContext.getRequestType(),
-          nodeLocationToEndPoint(targetNode),
+          endPoint,
           e.getMessage(),
           retryCount);
+      if (handler != null) {
+        try {
+          handler.onError(e);
+        } catch (final Exception handlerException) {
+          LOGGER.warn(
+              "Failed to handle async request error for request type {} on 
node {}: {}",
+              requestContext.getRequestType(),
+              endPoint,
+              handlerException.getMessage(),
+              handlerException);
+          requestContext.getCountDownLatch().countDown();
+        }
+      } else {
+        requestContext.getCountDownLatch().countDown();
+      }
+    } finally {
+      if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
+        ((ClientManager<TEndPoint, Client>) 
clientManager).returnClient(endPoint, client);
+      }
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
new file mode 100644
index 00000000000..c01cc814b3d
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.request;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsyncRequestManagerTest {
+
+  @Test
+  public void dispatchFailureShouldNotBlockRetryWithoutTimeout() throws 
Exception {
+    final TestAsyncRequestManager manager = new TestAsyncRequestManager();
+    final AsyncRequestContext<String, String, TestRequestType, 
TestNodeLocation> context =
+        new AsyncRequestContext<>(
+            TestRequestType.TEST,
+            "request",
+            Collections.singletonMap(1, new TestNodeLocation(new 
TEndPoint("localhost", 6667))));
+
+    final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    try {
+      final Future<?> future =
+          executorService.submit(() -> manager.sendAsyncRequest(context, 2, 
null, true));
+      future.get(3, TimeUnit.SECONDS);
+    } finally {
+      executorService.shutdownNow();
+    }
+
+    Assert.assertEquals(2, manager.getBorrowAttempts());
+    Assert.assertEquals("borrow failed", context.getResponseMap().get(1));
+    Assert.assertEquals(Collections.singletonList(1), 
context.getRequestIndices());
+  }
+
+  @Test
+  public void handlerErrorFailureShouldNotEscapeDispatchFailure() throws 
Exception {
+    final TestAsyncRequestManager manager = new TestAsyncRequestManager(false, 
true, true);
+    final AsyncRequestContext<String, String, TestRequestType, 
TestNodeLocation> context =
+        new AsyncRequestContext<>(
+            TestRequestType.TEST,
+            "request",
+            Collections.singletonMap(1, new TestNodeLocation(new 
TEndPoint("localhost", 6667))));
+
+    final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    try {
+      final Future<?> future =
+          executorService.submit(() -> manager.sendAsyncRequest(context, 1, 
null, true));
+      future.get(3, TimeUnit.SECONDS);
+    } finally {
+      executorService.shutdownNow();
+    }
+
+    Assert.assertEquals(1, manager.getBorrowAttempts());
+    Assert.assertTrue(context.getResponseMap().isEmpty());
+    Assert.assertEquals(Collections.singletonList(1), 
context.getRequestIndices());
+  }
+
+  private enum TestRequestType {
+    TEST
+  }
+
+  private static class TestNodeLocation {
+
+    private final TEndPoint endPoint;
+
+    private TestNodeLocation(final TEndPoint endPoint) {
+      this.endPoint = endPoint;
+    }
+  }
+
+  private static class TestAsyncRequestManager
+      extends AsyncRequestManager<TestRequestType, TestNodeLocation, Object> {
+
+    private final AtomicInteger borrowAttempts = new AtomicInteger();
+    private boolean failOnBorrow;
+    private boolean failOnDispatch;
+    private boolean failOnError;
+
+    private TestAsyncRequestManager() {
+      this(true, false, false);
+    }
+
+    private TestAsyncRequestManager(
+        final boolean failOnBorrow, final boolean failOnDispatch, final 
boolean failOnError) {
+      super(1);
+      this.failOnBorrow = failOnBorrow;
+      this.failOnDispatch = failOnDispatch;
+      this.failOnError = failOnError;
+    }
+
+    private int getBorrowAttempts() {
+      return borrowAttempts.get();
+    }
+
+    @Override
+    protected void initClientManager(final int 
selectorNumOfAsyncClientManager) {
+      clientManager =
+          new IClientManager<TEndPoint, Object>() {
+
+            @Override
+            public Object borrowClient(final TEndPoint node) throws 
ClientManagerException {
+              borrowAttempts.incrementAndGet();
+              if (failOnBorrow) {
+                throw new ClientManagerException("borrow failed");
+              }
+              return new Object();
+            }
+
+            @Override
+            public void clear(final TEndPoint node) {
+              // Do nothing
+            }
+
+            @Override
+            public void close() {
+              // Do nothing
+            }
+          };
+    }
+
+    @Override
+    protected void initActionMapBuilder() {
+      actionMapBuilder.put(
+          TestRequestType.TEST,
+          (request, client, handler) -> {
+            if (failOnDispatch) {
+              throw new RuntimeException("dispatch failed");
+            }
+            Assert.fail("The test client manager should fail before 
dispatch.");
+          });
+    }
+
+    @Override
+    protected TEndPoint nodeLocationToEndPoint(final TestNodeLocation 
location) {
+      return location.endPoint;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected AsyncRequestRPCHandler<?, TestRequestType, TestNodeLocation> 
buildHandler(
+        final AsyncRequestContext<?, ?, TestRequestType, TestNodeLocation> 
requestContext,
+        final int requestId,
+        final TestNodeLocation targetNode) {
+      return new TestAsyncRequestRPCHandler(
+          requestContext.getRequestType(),
+          requestId,
+          targetNode,
+          requestContext.getNodeLocationMap(),
+          (Map<Integer, String>) requestContext.getResponseMap(),
+          requestContext.getCountDownLatch(),
+          failOnError);
+    }
+  }
+
+  private static class TestAsyncRequestRPCHandler
+      extends AsyncRequestRPCHandler<String, TestRequestType, 
TestNodeLocation> {
+
+    private TestAsyncRequestRPCHandler(
+        final TestRequestType requestType,
+        final int requestId,
+        final TestNodeLocation targetNode,
+        final Map<Integer, TestNodeLocation> nodeLocationMap,
+        final Map<Integer, String> responseMap,
+        final CountDownLatch countDownLatch,
+        final boolean failOnError) {
+      super(requestType, requestId, targetNode, nodeLocationMap, responseMap, 
countDownLatch);
+      this.failOnError = failOnError;
+    }
+
+    private final boolean failOnError;
+
+    @Override
+    protected String generateFormattedTargetLocation(final TestNodeLocation 
location) {
+      return location.endPoint.toString();
+    }
+
+    @Override
+    public void onComplete(final String response) {
+      responseMap.put(requestId, response);
+      nodeLocationMap.remove(requestId);
+      countDownLatch.countDown();
+    }
+
+    @Override
+    public void onError(final Exception exception) {
+      if (failOnError) {
+        throw new RuntimeException("handler failed");
+      }
+      responseMap.put(requestId, exception.getMessage());
+      countDownLatch.countDown();
+    }
+  }
+}

Reply via email to