This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1df19137839 Fix pipe runtime meta push blocking user operations 
(#17909)
1df19137839 is described below

commit 1df19137839b3e813f1441071469c0a65afe2844
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 09:38:22 2026 +0800

    Fix pipe runtime meta push blocking user operations (#17909)
    
    * Fix pipe runtime meta push blocking user operations
    
    * Fix subscription runtime push blocking
    
    * Harden runtime meta sync best effort
    
    * Bound required metadata push waits
    
    * Fix async request dispatch failure hang
    
    * Harden async request failure handling
    
    * Use safe time conversion for runtime meta sync intervals
    
    * Harden subscription runtime meta sync timeouts
    
    * Bound pipe heartbeat metadata collection waits
    
    * Harden best-effort runtime metadata requests
    
    * Fix heartbeat client pool
    
    * Add tests and use real fail message & status
---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  12 +-
 .../java/org/apache/iotdb/rpc/RpcUtilsTest.java    |  25 ++
 .../async/AsyncAINodeHeartbeatClientPool.java      |  12 +-
 .../async/AsyncConfigNodeHeartbeatClientPool.java  |  12 +-
 .../async/AsyncDataNodeHeartbeatClientPool.java    |  22 +-
 .../rpc/CheckTimeSeriesExistenceRPCHandler.java    |   2 +-
 .../rpc/CountPathsUsingTemplateRPCHandler.java     |   2 +-
 .../rpc/FetchSchemaBlackListRPCHandler.java        |   2 +-
 .../async/handlers/rpc/SchemaUpdateRPCHandler.java |   2 +-
 .../CheckSchemaRegionUsingTemplateRPCHandler.java  |   2 +-
 .../manager/load/service/TopologyService.java      |  15 +-
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  13 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      | 332 +++++++++++++++++----
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  22 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |   2 +-
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |   6 +-
 .../AbstractOperateSubscriptionProcedure.java      |  20 ++
 .../runtime/CommitProgressSyncProcedure.java       |  10 +-
 .../runtime/ConsumerGroupMetaSyncProcedure.java    |  10 +-
 .../SubscriptionHandleLeaderChangeProcedure.java   |   7 +-
 .../topic/runtime/TopicMetaSyncProcedure.java      |   9 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  13 +-
 .../GeneralRegionAttributeSecurityService.java     |  67 ++---
 .../client/request/AsyncRequestManager.java        |  19 +-
 .../client/request/AsyncRequestManagerTest.java    | 226 ++++++++++++++
 26 files changed, 728 insertions(+), 138 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 1eb30967405..9c88bf3f8b7 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -151,8 +151,7 @@ public class RpcUtils {
   }
 
   public static void verifySuccess(List<TSStatus> statuses) throws 
BatchExecutionException {
-    StringBuilder errMsgs =
-        new 
StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": 
");
+    StringBuilder errMsgs = new StringBuilder();
     for (TSStatus status : statuses) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
           && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -160,7 +159,8 @@ public class RpcUtils {
       }
     }
     if (errMsgs.length() > 0) {
-      throw new BatchExecutionException(statuses, errMsgs.toString());
+      throw new BatchExecutionException(
+          statuses, TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ": " + 
errMsgs);
     }
   }
 
@@ -181,9 +181,9 @@ public class RpcUtils {
       for (TSStatus subStatus : statusList) {
         if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && subStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
-          if (!msgSet.contains(status)) {
-            errMsg.append(status).append("; ");
-            msgSet.add(status);
+          if (!msgSet.contains(subStatus)) {
+            errMsg.append(subStatus).append("; ");
+            msgSet.add(subStatus);
           }
         }
       }
diff --git 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
index a6980276074..940c50453a2 100644
--- 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
+++ 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
@@ -19,11 +19,15 @@
 
 package org.apache.iotdb.rpc;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
 
 public class RpcUtilsTest {
 
@@ -74,4 +78,25 @@ public class RpcUtilsTest {
     Assert.assertFalse(RpcUtils.isSetSqlDialect("setsql_dialect =table"));
     Assert.assertFalse(RpcUtils.isSetSqlDialect("set           sql_dia"));
   }
+
+  @Test
+  public void testVerifySuccessListAllowsSuccessfulStatuses() throws 
BatchExecutionException {
+    RpcUtils.verifySuccess(
+        Arrays.asList(
+            RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+            RpcUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)));
+  }
+
+  @Test
+  public void testVerifySuccessListThrowsOnFailure() {
+    TSStatus failedStatus = 
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "failed");
+
+    try {
+      RpcUtils.verifySuccess(Collections.singletonList(failedStatus));
+      Assert.fail("Expected BatchExecutionException");
+    } catch (BatchExecutionException e) {
+      Assert.assertEquals(Collections.singletonList(failedStatus), 
e.getStatusList());
+      Assert.assertTrue(e.getMessage().contains("failed"));
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index d15b45cbcd5..5c4b4c3175e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -54,8 +54,8 @@ public class AsyncAINodeHeartbeatClientPool {
       client = clientManager.borrowClient(endPoint);
       client.getAIHeartbeat(req, handler);
       dispatched = true;
-    } catch (Exception ignore) {
-      // Just ignore
+    } catch (Exception e) {
+      handleError(handler, e);
     } finally {
       // After the async call is dispatched, the client's onComplete/onError 
callback is
       // responsible for returning the client. If the RPC was not dispatched 
(exception
@@ -67,6 +67,14 @@ public class AsyncAINodeHeartbeatClientPool {
     }
   }
 
+  private void handleError(final AINodeHeartbeatHandler handler, final 
Exception e) {
+    try {
+      handler.onError(e);
+    } catch (final Exception ignore) {
+      // Ignore handler failures in heartbeat best-effort path.
+    }
+  }
+
   private static class AsyncAINodeHeartbeatClientPoolHolder {
 
     private static final AsyncAINodeHeartbeatClientPool INSTANCE =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 5cf716d0dd1..ab30935753b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -55,8 +55,8 @@ public class AsyncConfigNodeHeartbeatClientPool {
       client = clientManager.borrowClient(endPoint);
       client.getConfigNodeHeartBeat(heartbeatReq, handler);
       dispatched = true;
-    } catch (Exception ignore) {
-      // Just ignore
+    } catch (Exception e) {
+      handleError(handler, e);
     } finally {
       // After the async call is dispatched, the client's onComplete/onError 
callback is
       // responsible for returning the client. If the RPC was not dispatched 
(exception
@@ -68,6 +68,14 @@ public class AsyncConfigNodeHeartbeatClientPool {
     }
   }
 
+  private void handleError(final ConfigNodeHeartbeatHandler handler, final 
Exception e) {
+    try {
+      handler.onError(e);
+    } catch (final Exception ignore) {
+      // Ignore handler failures in heartbeat best-effort path.
+    }
+  }
+
   private static class AsyncConfigNodeHeartbeatClientPoolHolder {
 
     private static final AsyncConfigNodeHeartbeatClientPool INSTANCE =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index d31ec405e0c..516596a61b8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -56,8 +56,8 @@ public class AsyncDataNodeHeartbeatClientPool {
       client = clientManager.borrowClient(endPoint);
       client.getDataNodeHeartBeat(req, handler);
       dispatched = true;
-    } catch (Exception ignore) {
-      // Just ignore
+    } catch (Exception e) {
+      handleError(handler, e);
     } finally {
       returnClientIfNotDispatched(endPoint, client, dispatched);
     }
@@ -72,7 +72,7 @@ public class AsyncDataNodeHeartbeatClientPool {
       client.writeAuditLog(req, handler);
       dispatched = true;
     } catch (Exception e) {
-      // Just ignore
+      handleError(handler, e);
     } finally {
       returnClientIfNotDispatched(endPoint, client, dispatched);
     }
@@ -89,6 +89,22 @@ public class AsyncDataNodeHeartbeatClientPool {
     }
   }
 
+  private void handleError(final DataNodeHeartbeatHandler handler, final 
Exception e) {
+    try {
+      handler.onError(e);
+    } catch (final Exception ignore) {
+      // Ignore handler failures in heartbeat best-effort path.
+    }
+  }
+
+  private void handleError(final DataNodeWriteAuditLogHandler handler, final 
Exception e) {
+    try {
+      handler.onError(e);
+    } catch (final Exception ignore) {
+      // Ignore handler failures in audit-log best-effort path.
+    }
+  }
+
   private static class AsyncDataNodeHeartbeatClientPoolHolder {
 
     private static final AsyncDataNodeHeartbeatClientPool INSTANCE =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
index 3a1c47e1746..918d206fc37 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
@@ -83,11 +83,11 @@ public class CheckTimeSeriesExistenceRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
index 45574e131e7..ea2b342f597 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
@@ -83,11 +83,11 @@ public class CountPathsUsingTemplateRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
index 44f8362585c..a0f8957bbb4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
@@ -83,11 +83,11 @@ public class FetchSchemaBlackListRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
index 84d848a484e..ea1de3b6274 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
@@ -77,10 +77,10 @@ public class SchemaUpdateRPCHandler extends 
DataNodeTSStatusRPCHandler {
             + e.getMessage();
     LOGGER.warn(errorMsg);
 
-    countDownLatch.countDown();
     responseMap.put(
         requestId,
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
index 6d3d1bf5007..e2b1d7bc836 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
@@ -85,11 +85,11 @@ public class CheckSchemaRegionUsingTemplateRPCHandler
             + e.getMessage();
     LOGGER.error(errorMsg);
 
-    countDownLatch.countDown();
     TCheckSchemaRegionUsingTemplateResp resp = new 
TCheckSchemaRegionUsingTemplateResp();
     resp.setStatus(
         new TSStatus(
             
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), 
errorMsg)));
     responseMap.put(requestId, resp);
+    countDownLatch.countDown();
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
index 6d334521ded..b99cbd00017 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.ratis.util.AwaitForSignal;
 import org.apache.tsfile.utils.Pair;
@@ -72,6 +73,7 @@ import java.util.stream.Collectors;
 public class TopologyService implements Runnable, IClusterStatusSubscriber {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
   private static final int SAMPLING_WINDOW_SIZE = 100;
+  private static final int TOPOLOGY_PROBING_RETRY_NUM = 1;
 
   private final ExecutorService topologyThread =
       IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -222,7 +224,7 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
                 nodeLocations,
                 proberLocationMap);
     CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout);
+        .sendAsyncRequest(dataNodeAsyncRequestContext, 
TOPOLOGY_PROBING_RETRY_NUM, timeout, true);
     final List<TTestConnectionResult> results = new ArrayList<>();
     dataNodeAsyncRequestContext
         .getResponseMap()
@@ -360,15 +362,18 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
     }
 
     CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestWithTimeoutInMs(context, 
CONF.getTopologyProbingBaseIntervalInMs());
+        .sendAsyncRequest(
+            context, TOPOLOGY_PROBING_RETRY_NUM, 
CONF.getTopologyProbingBaseIntervalInMs(), true);
 
     context
         .getResponseMap()
         .forEach(
             (nodeId, resp) -> {
-              Set<Integer> reachableSet =
-                  computedTopology.getOrDefault(nodeId, 
Collections.emptySet());
-              lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
+              if (resp.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+                Set<Integer> reachableSet =
+                    computedTopology.getOrDefault(nodeId, 
Collections.emptySet());
+                lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
+              }
             });
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 91d36f4e441..b9276880cea 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -52,6 +52,7 @@ public class PipeHeartbeatScheduler {
       PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
   private static final long HEARTBEAT_INTERVAL_SECONDS =
       
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
+  private static final int PIPE_HEARTBEAT_RETRY_NUM = 1;
 
   private static final ScheduledExecutorService HEARTBEAT_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -102,12 +103,8 @@ public class PipeHeartbeatScheduler {
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_HEARTBEAT, request, 
dataNodeLocationMap);
     CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
-                * 1000L
-                * 2
-                / 3);
+        .sendAsyncRequest(
+            clientHandler, PIPE_HEARTBEAT_RETRY_NUM, 
getPipeHeartbeatRequestTimeoutInMs(), true);
     clientHandler
         .getResponseMap()
         .forEach(
@@ -137,6 +134,10 @@ public class PipeHeartbeatScheduler {
     }
   }
 
+  private static long getPipeHeartbeatRequestTimeoutInMs() {
+    return TimeUnit.SECONDS.toMillis(HEARTBEAT_INTERVAL_SECONDS) * 2 / 3;
+  }
+
   public synchronized void stop() {
     if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
       heartbeatFuture.cancel(false);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 4105b8ecf5c..ab4f367df92 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
@@ -110,6 +111,8 @@ public class ConfigNodeProcedureEnv {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ConfigNodeProcedureEnv.class);
 
+  private static final int RUNTIME_META_PUSH_RETRY_NUM = 1;
+
   /** Add or remove node lock. */
   private final LockQueue nodeLock = new LockQueue();
 
@@ -655,10 +658,23 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp> 
clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushPipeMetaResp> pushAllPipeMetaToDataNodesBestEffort(
+      List<ByteBuffer> pipeMetaBinaryList) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushPipeMetaReq request = new 
TPushPipeMetaReq().setPipeMetas(pipeMetaBinaryList);
+
+    final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp> 
clientHandler =
+        new DataNodeAsyncRequestContext<>(
+            CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request, 
dataNodeLocationMap);
+    final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -670,10 +686,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -686,10 +701,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -703,10 +717,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -719,10 +732,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq, 
TPushPipeMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -735,10 +747,23 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp> 
clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushTopicMetaResp> 
pushAllTopicMetaToDataNodesBestEffort(
+      List<ByteBuffer> topicMetaBinaryList) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushTopicMetaReq request = new 
TPushTopicMetaReq().setTopicMetas(topicMetaBinaryList);
+
+    final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp> 
clientHandler =
+        new DataNodeAsyncRequestContext<>(
+            CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request, 
dataNodeLocationMap);
+    final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -750,7 +775,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushTopicMetaResp::getStatus)
         .collect(Collectors.toList());
@@ -765,7 +792,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushTopicMetaResp::getStatus)
         .collect(Collectors.toList());
@@ -781,10 +810,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -797,10 +825,9 @@ public class ConfigNodeProcedureEnv {
     final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq, 
TPushTopicMetaResp> clientHandler =
         new DataNodeAsyncRequestContext<>(
             CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -815,10 +842,25 @@ public class ConfigNodeProcedureEnv {
         clientHandler =
             new DataNodeAsyncRequestContext<>(
                 CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushConsumerGroupMetaResp> 
pushAllConsumerGroupMetaToDataNodesBestEffort(
+      List<ByteBuffer> consumerGroupMetaBinaryList) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushConsumerGroupMetaReq request =
+        new 
TPushConsumerGroupMetaReq().setConsumerGroupMetas(consumerGroupMetaBinaryList);
+
+    final DataNodeAsyncRequestContext<TPushConsumerGroupMetaReq, 
TPushConsumerGroupMetaResp>
+        clientHandler =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request, 
dataNodeLocationMap);
+    final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -834,7 +876,9 @@ public class ConfigNodeProcedureEnv {
                 CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
                 request,
                 dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushConsumerGroupMetaResp::getStatus)
         .collect(Collectors.toList());
@@ -852,7 +896,9 @@ public class ConfigNodeProcedureEnv {
                 CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
                 request,
                 dataNodeLocationMap);
-    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseList().stream()
         .map(TPushConsumerGroupMetaResp::getStatus)
         .collect(Collectors.toList());
@@ -867,10 +913,23 @@ public class ConfigNodeProcedureEnv {
         clientHandler =
             new DataNodeAsyncRequestContext<>(
                 CnToDnAsyncRequestType.PULL_COMMIT_PROGRESS, request, 
dataNodeLocationMap);
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
+    final long timeoutInMs = 
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+    sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+    fillMissingPullCommitProgressResponses(clientHandler, timeoutInMs);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPullCommitProgressResp> 
pullCommitProgressFromDataNodesBestEffort() {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPullCommitProgressReq request = new TPullCommitProgressReq();
+
+    final DataNodeAsyncRequestContext<TPullCommitProgressReq, 
TPullCommitProgressResp>
+        clientHandler =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.PULL_COMMIT_PROGRESS, request, 
dataNodeLocationMap);
+    final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+    fillMissingPullCommitProgressResponses(clientHandler, timeoutInMs);
     return clientHandler.getResponseMap();
   }
 
@@ -884,8 +943,12 @@ public class ConfigNodeProcedureEnv {
     final Set<Integer> readableDataNodeIds =
         
getLoadManager().filterDataNodeThroughStatus(NodeStatus::isReadable).stream()
             .collect(Collectors.toSet());
-    final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq, TSStatus> 
clientHandler =
-        new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME);
+    final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq, TSStatus>
+        readableDataNodeClientHandler =
+            new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME);
+    final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq, TSStatus>
+        unreadableDataNodeClientHandler =
+            new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME);
 
     dataNodeLocationMap.forEach(
         (dataNodeId, dataNodeLocation) -> {
@@ -919,16 +982,25 @@ public class ConfigNodeProcedureEnv {
                         preferredWriterNodeId == dataNodeId,
                         new ArrayList<>(activeWriterNodeIds)));
               });
+          final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq, 
TSStatus> clientHandler =
+              readableDataNodeIds.contains(dataNodeId)
+                  ? readableDataNodeClientHandler
+                  : unreadableDataNodeClientHandler;
           clientHandler.putNodeLocation(dataNodeId, dataNodeLocation);
           clientHandler.putRequest(
               dataNodeId, new 
TPushSubscriptionRuntimeReq().setRuntimeStates(runtimeStates));
         });
 
-    CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
-            clientHandler,
-            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 
60 * 1000 * 2 / 3);
-    return clientHandler.getResponseMap();
+    final long readableTimeoutInMs = 
sendRequiredRuntimeMetaRequest(readableDataNodeClientHandler);
+    fillMissingSubscriptionRuntimeResponses(readableDataNodeClientHandler, 
readableTimeoutInMs);
+    final long unreadableTimeoutInMs =
+        sendBestEffortRuntimeMetaRequest(unreadableDataNodeClientHandler);
+    fillMissingSubscriptionRuntimeResponses(unreadableDataNodeClientHandler, 
unreadableTimeoutInMs);
+
+    final Map<Integer, TSStatus> responseMap =
+        new HashMap<>(readableDataNodeClientHandler.getResponseMap());
+    responseMap.putAll(unreadableDataNodeClientHandler.getResponseMap());
+    return responseMap;
   }
 
   private boolean isRuntimeActiveWriterNode(final int dataNodeId) {
@@ -937,6 +1009,164 @@ public class ConfigNodeProcedureEnv {
         && getLoadManager().getNodeStatus(dataNodeId) != NodeStatus.Removing;
   }
 
+  private static long sendBestEffortRuntimeMetaRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler) {
+    final long timeoutInMs = getRuntimeMetaPushTimeoutInMs();
+    sendRuntimeMetaRequest(clientHandler, true, timeoutInMs);
+    return timeoutInMs;
+  }
+
+  private static long sendRequiredRuntimeMetaRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler) {
+    final long timeoutInMs = getRuntimeMetaPushTimeoutInMs();
+    sendRuntimeMetaRequest(clientHandler, false, timeoutInMs);
+    return timeoutInMs;
+  }
+
+  private static void sendRequiredMetadataRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler, final long 
timeoutInMs) {
+    sendRuntimeMetaRequest(clientHandler, false, timeoutInMs);
+  }
+
+  private static void sendRuntimeMetaRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler, final boolean 
keepSilent) {
+    sendRuntimeMetaRequest(clientHandler, keepSilent, 
getRuntimeMetaPushTimeoutInMs());
+  }
+
+  private static void sendRuntimeMetaRequest(
+      final DataNodeAsyncRequestContext<?, ?> clientHandler,
+      final boolean keepSilent,
+      final long timeoutInMs) {
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequest(clientHandler, RUNTIME_META_PUSH_RETRY_NUM, 
timeoutInMs, keepSilent);
+  }
+
+  private static void fillMissingPipePushMetaResponses(
+      final DataNodeAsyncRequestContext<?, TPushPipeMetaResp> clientHandler,
+      final long timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        new TPushPipeMetaResp(
+                            createMetadataTimeoutStatus(
+                                clientHandler.getRequestType(),
+                                dataNodeId,
+                                TSStatusCode.PIPE_PUSH_META_ERROR,
+                                timeoutInMs))));
+  }
+
+  private static void fillMissingTopicPushMetaResponses(
+      final DataNodeAsyncRequestContext<?, TPushTopicMetaResp> clientHandler,
+      final long timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        new TPushTopicMetaResp(
+                            createMetadataTimeoutStatus(
+                                clientHandler.getRequestType(),
+                                dataNodeId,
+                                TSStatusCode.TOPIC_PUSH_META_ERROR,
+                                timeoutInMs))));
+  }
+
+  private static void fillMissingConsumerGroupPushMetaResponses(
+      final DataNodeAsyncRequestContext<?, TPushConsumerGroupMetaResp> 
clientHandler,
+      final long timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        new TPushConsumerGroupMetaResp(
+                            createMetadataTimeoutStatus(
+                                clientHandler.getRequestType(),
+                                dataNodeId,
+                                TSStatusCode.CONSUMER_PUSH_META_ERROR,
+                                timeoutInMs))));
+  }
+
+  private static void fillMissingPullCommitProgressResponses(
+      final DataNodeAsyncRequestContext<?, TPullCommitProgressResp> 
clientHandler,
+      final long timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        new TPullCommitProgressResp(
+                            createMetadataTimeoutStatus(
+                                clientHandler.getRequestType(),
+                                dataNodeId,
+                                TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                                timeoutInMs))));
+  }
+
+  private static void fillMissingSubscriptionRuntimeResponses(
+      final DataNodeAsyncRequestContext<?, TSStatus> clientHandler, final long 
timeoutInMs) {
+    clientHandler
+        .getRequestIndices()
+        .forEach(
+            dataNodeId ->
+                clientHandler
+                    .getResponseMap()
+                    .putIfAbsent(
+                        dataNodeId,
+                        createMetadataTimeoutStatus(
+                            clientHandler.getRequestType(),
+                            dataNodeId,
+                            TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                            timeoutInMs)));
+  }
+
+  private static TSStatus createMetadataTimeoutStatus(
+      final CnToDnAsyncRequestType requestType,
+      final int dataNodeId,
+      final TSStatusCode statusCode,
+      final long timeoutInMs) {
+    return new TSStatus(statusCode.getStatusCode())
+        .setMessage(
+            String.format(
+                "Failed to %s on DataNode %s before metadata request timeout 
%sms",
+                requestType, dataNodeId, timeoutInMs));
+  }
+
+  private static long getRuntimeMetaPushTimeoutInMs() {
+    return TimeUnit.SECONDS.toMillis(
+            
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta())
+        * 2
+        / 3;
+  }
+
+  private static long getRequiredPipeMetadataRequestTimeoutInMs() {
+    return TimeUnit.MINUTES.toMillis(
+            PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+        * 2
+        / 3;
+  }
+
+  private static long getRequiredSubscriptionMetadataRequestTimeoutInMs() {
+    return TimeUnit.MINUTES.toMillis(
+            
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+        * 2
+        / 3;
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 5679b36ffc5..0e72737f265 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -487,10 +487,11 @@ public abstract class AbstractOperatePipeProcedureV2
 
       if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) {
         if (!resp.isSetExceptionMessages()) {
+          final String statusMessage = resp.getStatus().getMessage();
           exceptionMessageBuilder.append(
               String.format(
-                  "DataNodeId: %s, Message: Internal error while processing 
pushPipeMeta on dataNodes.",
-                  dataNodeId));
+                  "DataNodeId: %s, Message: Internal error while processing 
pushPipeMeta on dataNodes.%s",
+                  dataNodeId, statusMessage == null ? "" : " " + 
statusMessage));
           continue;
         }
 
@@ -538,6 +539,23 @@ public abstract class AbstractOperatePipeProcedureV2
     }
   }
 
+  protected Map<Integer, TPushPipeMetaResp> 
pushPipeMetaToDataNodesBestEffortAndGetResponse(
+      ConfigNodeProcedureEnv env) throws IOException {
+    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+    for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
+      
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
+    }
+    return env.pushAllPipeMetaToDataNodesBestEffort(pipeMetaBinaryList);
+  }
+
+  protected void pushPipeMetaToDataNodesBestEffort(ConfigNodeProcedureEnv env) 
{
+    try {
+      pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
+    } catch (Exception e) {
+      
LOGGER.info(ProcedureMessages.FAILED_TO_PUSH_PIPE_META_LIST_TO_DATA_NODES_WILL, 
e);
+    }
+  }
+
   /**
    * Pushing one pipeMeta to all the dataNodes, forcing an update to the 
pipe's runtime state.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 93653923375..a8399b281b4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -124,7 +124,7 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
         LOGGER::info,
         
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
 
-    pushPipeMetaToDataNodesIgnoreException(env);
+    pushPipeMetaToDataNodesBestEffort(env);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 394f7e2fa9f..08468f3d04d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -140,7 +140,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
       return;
     }
 
-    pushPipeMetaToDataNodesIgnoreException(env);
+    pushPipeMetaToDataNodesBestEffort(env);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 072555146c1..d371b7e9b20 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -52,6 +52,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -65,7 +66,8 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
 
   private static final long MIN_EXECUTION_INTERVAL_MS =
-      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+      
TimeUnit.MINUTES.toMillis(PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+          / 2;
   // No need to serialize this field
   private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
 
@@ -205,7 +207,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     
LOGGER.debug(ProcedureMessages.PIPEMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES);
 
-    Map<Integer, TPushPipeMetaResp> respMap = pushPipeMetaToDataNodes(env);
+    Map<Integer, TPushPipeMetaResp> respMap = 
pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
     if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) {
       throw new PipeException(
           String.format(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 9b0bd21750d..d6fa0de5d20 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -350,6 +350,16 @@ public abstract class AbstractOperateSubscriptionProcedure
     return env.pushAllTopicMetaToDataNodes(topicMetaBinaryList);
   }
 
+  protected Map<Integer, TPushTopicMetaResp> 
pushTopicMetaToDataNodesBestEffort(
+      ConfigNodeProcedureEnv env) throws IOException {
+    final List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
+    for (TopicMeta topicMeta : subscriptionInfo.get().getAllTopicMeta()) {
+      topicMetaBinaryList.add(topicMeta.serialize());
+    }
+
+    return env.pushAllTopicMetaToDataNodesBestEffort(topicMetaBinaryList);
+  }
+
   public static boolean pushTopicMetaHasException(Map<Integer, 
TPushTopicMetaResp> respMap) {
     for (TPushTopicMetaResp resp : respMap.values()) {
       if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -376,6 +386,16 @@ public abstract class AbstractOperateSubscriptionProcedure
     return 
env.pushAllConsumerGroupMetaToDataNodes(consumerGroupMetaBinaryList);
   }
 
+  protected Map<Integer, TPushConsumerGroupMetaResp> 
pushConsumerGroupMetaToDataNodesBestEffort(
+      ConfigNodeProcedureEnv env) throws IOException {
+    final List<ByteBuffer> consumerGroupMetaBinaryList = new ArrayList<>();
+    for (ConsumerGroupMeta consumerGroupMeta : 
subscriptionInfo.get().getAllConsumerGroupMeta()) {
+      consumerGroupMetaBinaryList.add(consumerGroupMeta.serialize());
+    }
+
+    return 
env.pushAllConsumerGroupMetaToDataNodesBestEffort(consumerGroupMetaBinaryList);
+  }
+
   public static boolean pushConsumerGroupMetaHasException(
       Map<Integer, TPushConsumerGroupMetaResp> respMap) {
     for (TPushConsumerGroupMetaResp resp : respMap.values()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
index e9b3056e662..348412af106 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
@@ -20,7 +20,7 @@
 package 
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import 
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.CommitProgressHandleMetaChangePlan;
 import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -47,6 +47,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -59,7 +60,9 @@ public class CommitProgressSyncProcedure extends 
AbstractOperateSubscriptionProc
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CommitProgressSyncProcedure.class);
 
   private static final long MIN_EXECUTION_INTERVAL_MS =
-      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+      TimeUnit.MINUTES.toMillis(
+              
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+          / 2;
   private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
 
   public CommitProgressSyncProcedure() {
@@ -106,7 +109,8 @@ public class CommitProgressSyncProcedure extends 
AbstractOperateSubscriptionProc
     LOGGER.info("CommitProgressSyncProcedure: 
executeFromOperateOnConfigNodes");
 
     // 1. Pull commit progress from all DataNodes
-    final Map<Integer, TPullCommitProgressResp> respMap = 
env.pullCommitProgressFromDataNodes();
+    final Map<Integer, TPullCommitProgressResp> respMap =
+        env.pullCommitProgressFromDataNodesBestEffort();
 
     // 2. Merge all DataNode responses with existing progress using Math::max
     final Map<String, RegionProgress> mergedRegionProgress =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 20a1173ac60..8e14b26e260 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -20,7 +20,7 @@
 package 
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
 import 
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -44,6 +44,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -53,7 +54,9 @@ public class ConsumerGroupMetaSyncProcedure extends 
AbstractOperateSubscriptionP
       LoggerFactory.getLogger(ConsumerGroupMetaSyncProcedure.class);
 
   private static final long MIN_EXECUTION_INTERVAL_MS =
-      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+      TimeUnit.MINUTES.toMillis(
+              
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+          / 2;
   // No need to serialize this field
   private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
 
@@ -129,7 +132,8 @@ public class ConsumerGroupMetaSyncProcedure extends 
AbstractOperateSubscriptionP
       throws SubscriptionException, IOException {
     
LOGGER.info(ProcedureMessages.CONSUMERGROUPMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES);
 
-    Map<Integer, TPushConsumerGroupMetaResp> respMap = 
pushConsumerGroupMetaToDataNodes(env);
+    Map<Integer, TPushConsumerGroupMetaResp> respMap =
+        pushConsumerGroupMetaToDataNodesBestEffort(env);
     if (pushConsumerGroupMetaHasException(respMap)) {
       throw new SubscriptionException(
           String.format(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
index 5337b719d20..09414b0e3a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
@@ -104,7 +104,8 @@ public class SubscriptionHandleLeaderChangeProcedure 
extends AbstractOperateSubs
       throws SubscriptionException {
     LOGGER.info("SubscriptionHandleLeaderChangeProcedure: 
executeFromOperateOnConfigNodes");
 
-    final Map<Integer, TPullCommitProgressResp> respMap = 
env.pullCommitProgressFromDataNodes();
+    final Map<Integer, TPullCommitProgressResp> respMap =
+        env.pullCommitProgressFromDataNodesBestEffort();
     final Map<String, RegionProgress> mergedRegionProgress =
         deserializeRegionProgressMap(
             
subscriptionInfo.get().getCommitProgressKeeper().getAllRegionProgress());
@@ -158,7 +159,7 @@ public class SubscriptionHandleLeaderChangeProcedure 
extends AbstractOperateSubs
       throws SubscriptionException, IOException {
     LOGGER.info("SubscriptionHandleLeaderChangeProcedure: 
executeFromOperateOnDataNodes");
 
-    final Map<Integer, TPushTopicMetaResp> topicRespMap = 
pushTopicMetaToDataNodes(env);
+    final Map<Integer, TPushTopicMetaResp> topicRespMap = 
pushTopicMetaToDataNodesBestEffort(env);
     topicRespMap.forEach(
         (dataNodeId, resp) -> {
           if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -170,7 +171,7 @@ public class SubscriptionHandleLeaderChangeProcedure 
extends AbstractOperateSubs
         });
 
     final Map<Integer, TPushConsumerGroupMetaResp> consumerGroupRespMap =
-        pushConsumerGroupMetaToDataNodes(env);
+        pushConsumerGroupMetaToDataNodesBestEffort(env);
     consumerGroupRespMap.forEach(
         (dataNodeId, resp) -> {
           if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index b7432a2c588..d1ba988db63 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import 
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -44,6 +44,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -52,7 +53,9 @@ public class TopicMetaSyncProcedure extends 
AbstractOperateSubscriptionProcedure
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TopicMetaSyncProcedure.class);
 
   private static final long MIN_EXECUTION_INTERVAL_MS =
-      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 * 
1000 / 2;
+      TimeUnit.MINUTES.toMillis(
+              
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+          / 2;
   // No need to serialize this field
   private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
 
@@ -129,7 +132,7 @@ public class TopicMetaSyncProcedure extends 
AbstractOperateSubscriptionProcedure
       throws SubscriptionException, IOException {
     
LOGGER.info(ProcedureMessages.TOPICMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES);
 
-    Map<Integer, TPushTopicMetaResp> respMap = pushTopicMetaToDataNodes(env);
+    Map<Integer, TPushTopicMetaResp> respMap = 
pushTopicMetaToDataNodesBestEffort(env);
     if (pushTopicMetaHasException(respMap)) {
       throw new SubscriptionException(
           
String.format(ProcedureMessages.FAILED_TO_PUSH_TOPIC_META_TO_DATANODES_DETAILS, 
respMap));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2af869ab2f9..c588c94c7b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -435,6 +435,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   private static final long TEST_CONNECTION_TIMEOUT_MS =
       CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+  private static final int TEST_CONNECTION_RETRY_NUM = 1;
 
   private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR =
       IoTDBThreadPoolFactory.newFixedThreadPool(
@@ -2222,7 +2223,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         DnToCnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToCnRequestType, 
TConfigNodeLocation> handler) ->
             DnToCnInternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   private List<TTestConnectionResult> 
testAllDataNodeConnectionInHeartbeatChannel(
@@ -2248,7 +2250,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
             DnToDnInternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -2261,7 +2264,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
             DataNodeMPPServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -2274,7 +2278,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
             DataNodeExternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
+                .sendAsyncRequest(
+                    handler, TEST_CONNECTION_RETRY_NUM, 
TEST_CONNECTION_TIMEOUT_MS, true));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
index db23436e520..4f668a19075 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
@@ -67,6 +67,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -77,6 +78,7 @@ public class GeneralRegionAttributeSecurityService extends 
AbstractPeriodicalSer
       LoggerFactory.getLogger(GeneralRegionAttributeSecurityService.class);
 
   private static final IoTDBConfig iotdbConfig = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final int ATTRIBUTE_UPDATE_RETRY_NUM = 1;
   private final Map<Integer, Pair<Long, Integer>> 
dataNodeId2FailureDurationAndTimesMap =
       new HashMap<>();
   private final Set<ISchemaRegion> regionLeaders =
@@ -214,46 +216,42 @@ public class GeneralRegionAttributeSecurityService 
extends AbstractPeriodicalSer
                                   ByteBuffer.wrap(bytes)));
                     }));
 
-    DnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestWithTimeoutInMs(
-            clientHandler,
+    final long timeoutInMs =
+        TimeUnit.SECONDS.toMillis(
             IoTDBDescriptor.getInstance()
                 .getConfig()
                 .getGeneralRegionAttributeSecurityServiceTimeoutSeconds());
+    DnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequest(clientHandler, ATTRIBUTE_UPDATE_RETRY_NUM, 
timeoutInMs);
 
     final AtomicBoolean needFetch = new AtomicBoolean(false);
+    final Set<Integer> failedDataNodes = new 
HashSet<>(clientHandler.getRequestIndices());
+    final long failureDurationToFetchInMs =
+        TimeUnit.SECONDS.toMillis(
+            
iotdbConfig.getGeneralRegionAttributeSecurityServiceFailureDurationSecondsToFetch());
 
-    final Set<Integer> failedDataNodes =
-        clientHandler.getResponseMap().entrySet().stream()
-            .filter(
-                entry -> {
-                  final boolean failed =
-                      entry.getValue().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode();
-                  if (failed) {
-                    dataNodeId2FailureDurationAndTimesMap.compute(
-                        entry.getKey(),
-                        (k, v) -> {
-                          if (Objects.isNull(v)) {
-                            return new Pair<>(System.currentTimeMillis(), 1);
-                          }
-                          v.setRight(v.getRight() + 1);
-                          if (System.currentTimeMillis() - v.getLeft()
-                                  >= iotdbConfig
-                                      
.getGeneralRegionAttributeSecurityServiceFailureDurationSecondsToFetch()
-                              || v.getRight()
-                                  >= iotdbConfig
-                                      
.getGeneralRegionAttributeSecurityServiceFailureTimesToFetch()) {
-                            needFetch.set(true);
-                          }
-                          return v;
-                        });
-                  } else {
-                    
dataNodeId2FailureDurationAndTimesMap.remove(entry.getKey());
+    clientHandler.getResponseMap().entrySet().stream()
+        .filter(entry -> entry.getValue().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode())
+        .map(Map.Entry::getKey)
+        .forEach(dataNodeId2FailureDurationAndTimesMap::remove);
+
+    failedDataNodes.forEach(
+        dataNodeId ->
+            dataNodeId2FailureDurationAndTimesMap.compute(
+                dataNodeId,
+                (k, v) -> {
+                  if (Objects.isNull(v)) {
+                    return new Pair<>(System.currentTimeMillis(), 1);
+                  }
+                  v.setRight(v.getRight() + 1);
+                  if (System.currentTimeMillis() - v.getLeft() >= 
failureDurationToFetchInMs
+                      || v.getRight()
+                          >= iotdbConfig
+                              
.getGeneralRegionAttributeSecurityServiceFailureTimesToFetch()) {
+                    needFetch.set(true);
                   }
-                  return failed;
-                })
-            .map(Map.Entry::getKey)
-            .collect(Collectors.toSet());
+                  return v;
+                }));
 
     // Compute node shrinkage before failure removal in commit
     final Map<SchemaRegionId, Set<TDataNodeLocation>> result =
@@ -301,7 +299,8 @@ public class GeneralRegionAttributeSecurityService extends 
AbstractPeriodicalSer
     super(
         IoTDBThreadPoolFactory.newSingleThreadExecutor(
             ThreadName.GENERAL_REGION_ATTRIBUTE_SECURITY_SERVICE.getName()),
-        iotdbConfig.getGeneralRegionAttributeSecurityServiceIntervalSeconds() 
* 1000L);
+        TimeUnit.SECONDS.toMillis(
+            
iotdbConfig.getGeneralRegionAttributeSecurityServiceIntervalSeconds()));
   }
 
   private static final class GeneralRegionAttributeSecurityServiceHolder {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 1f147070acb..a00b653d28a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -182,6 +182,7 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
     final TEndPoint endPoint = nodeLocationToEndPoint(targetNode);
     Client client = null;
     boolean dispatched = false;
+    AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler = null;
     try {
       if (!actionMap.containsKey(requestContext.getRequestType())) {
         throw new UnsupportedOperationException(
@@ -189,11 +190,10 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
                 + requestContext.getRequestType()
                 + ", please set it in 
AsyncRequestManager::initActionMapBuilder()");
       }
+      handler = buildHandler(requestContext, requestId, targetNode);
       client = clientManager.borrowClient(endPoint);
       adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
       Object req = requestContext.getRequest(requestId);
-      AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
-          buildHandler(requestContext, requestId, targetNode);
       Objects.requireNonNull(actionMap.get(requestContext.getRequestType()))
           .accept(req, client, handler);
       // After accept() returns, the async callback (onComplete/onError) takes 
over the
@@ -207,6 +207,21 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
           endPoint,
           e.getMessage(),
           retryCount);
+      if (handler != null) {
+        try {
+          handler.onError(e);
+        } catch (final Exception handlerException) {
+          LOGGER.warn(
+              "Failed to handle async request error for request type {} on 
node {}: {}",
+              requestContext.getRequestType(),
+              endPoint,
+              handlerException.getMessage(),
+              handlerException);
+          requestContext.getCountDownLatch().countDown();
+        }
+      } else {
+        requestContext.getCountDownLatch().countDown();
+      }
     } finally {
       if (!dispatched && client != null && clientManager instanceof 
ClientManager) {
         ((ClientManager<TEndPoint, Client>) 
clientManager).returnClient(endPoint, client);
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
new file mode 100644
index 00000000000..40b2bf9730b
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.request;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsyncRequestManagerTest {
+
+  @Test
+  public void dispatchFailureShouldNotBlockRetryWithoutTimeout() throws 
Exception {
+    final TestAsyncRequestManager manager = new TestAsyncRequestManager();
+    final AsyncRequestContext<String, String, TestRequestType, 
TestNodeLocation> context =
+        new AsyncRequestContext<>(
+            TestRequestType.TEST,
+            "request",
+            Collections.singletonMap(1, new TestNodeLocation(new 
TEndPoint("localhost", 6667))));
+
+    final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    try {
+      final Future<?> future =
+          executorService.submit(() -> manager.sendAsyncRequest(context, 2, 
null, true));
+      future.get(3, TimeUnit.SECONDS);
+    } finally {
+      executorService.shutdownNow();
+    }
+
+    Assert.assertEquals(2, manager.getBorrowAttempts());
+    Assert.assertEquals("borrow failed", context.getResponseMap().get(1));
+    Assert.assertEquals(Collections.singletonList(1), 
context.getRequestIndices());
+  }
+
+  @Test
+  public void handlerErrorFailureShouldNotEscapeDispatchFailure() throws 
Exception {
+    final TestAsyncRequestManager manager = new TestAsyncRequestManager(false, 
true, true);
+    final AsyncRequestContext<String, String, TestRequestType, 
TestNodeLocation> context =
+        new AsyncRequestContext<>(
+            TestRequestType.TEST,
+            "request",
+            Collections.singletonMap(1, new TestNodeLocation(new 
TEndPoint("localhost", 6667))));
+
+    final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+    try {
+      final Future<?> future =
+          executorService.submit(() -> manager.sendAsyncRequest(context, 1, 
null, true));
+      future.get(3, TimeUnit.SECONDS);
+    } finally {
+      executorService.shutdownNow();
+    }
+
+    Assert.assertEquals(1, manager.getBorrowAttempts());
+    Assert.assertTrue(context.getResponseMap().isEmpty());
+    Assert.assertEquals(Collections.singletonList(1), 
context.getRequestIndices());
+  }
+
+  private enum TestRequestType {
+    TEST
+  }
+
+  private static class TestNodeLocation {
+
+    private final TEndPoint endPoint;
+
+    private TestNodeLocation(final TEndPoint endPoint) {
+      this.endPoint = endPoint;
+    }
+  }
+
+  private static class TestAsyncRequestManager
+      extends AsyncRequestManager<TestRequestType, TestNodeLocation, Object> {
+
+    private final AtomicInteger borrowAttempts = new AtomicInteger();
+    private boolean failOnBorrow;
+    private boolean failOnDispatch;
+    private boolean failOnError;
+
+    private TestAsyncRequestManager() {
+      this(true, false, false);
+    }
+
+    private TestAsyncRequestManager(
+        final boolean failOnBorrow, final boolean failOnDispatch, final 
boolean failOnError) {
+      super(1);
+      this.failOnBorrow = failOnBorrow;
+      this.failOnDispatch = failOnDispatch;
+      this.failOnError = failOnError;
+    }
+
+    private int getBorrowAttempts() {
+      return borrowAttempts.get();
+    }
+
+    @Override
+    protected void initClientManager(final int 
selectorNumOfAsyncClientManager) {
+      clientManager =
+          new IClientManager<TEndPoint, Object>() {
+
+            @Override
+            public Object borrowClient(final TEndPoint node) throws 
ClientManagerException {
+              borrowAttempts.incrementAndGet();
+              if (failOnBorrow) {
+                throw new ClientManagerException("borrow failed");
+              }
+              return new Object();
+            }
+
+            @Override
+            public void clear(final TEndPoint node) {
+              // Do nothing
+            }
+
+            @Override
+            public void clearAll() {
+              // Do nothing
+            }
+
+            @Override
+            public void close() {
+              // Do nothing
+            }
+          };
+    }
+
+    @Override
+    protected void initActionMapBuilder() {
+      actionMapBuilder.put(
+          TestRequestType.TEST,
+          (request, client, handler) -> {
+            if (failOnDispatch) {
+              throw new RuntimeException("dispatch failed");
+            }
+            Assert.fail("The test client manager should fail before 
dispatch.");
+          });
+    }
+
+    @Override
+    protected TEndPoint nodeLocationToEndPoint(final TestNodeLocation 
location) {
+      return location.endPoint;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected AsyncRequestRPCHandler<?, TestRequestType, TestNodeLocation> 
buildHandler(
+        final AsyncRequestContext<?, ?, TestRequestType, TestNodeLocation> 
requestContext,
+        final int requestId,
+        final TestNodeLocation targetNode) {
+      return new TestAsyncRequestRPCHandler(
+          requestContext.getRequestType(),
+          requestId,
+          targetNode,
+          requestContext.getNodeLocationMap(),
+          (Map<Integer, String>) requestContext.getResponseMap(),
+          requestContext.getCountDownLatch(),
+          failOnError);
+    }
+  }
+
+  private static class TestAsyncRequestRPCHandler
+      extends AsyncRequestRPCHandler<String, TestRequestType, 
TestNodeLocation> {
+
+    private TestAsyncRequestRPCHandler(
+        final TestRequestType requestType,
+        final int requestId,
+        final TestNodeLocation targetNode,
+        final Map<Integer, TestNodeLocation> nodeLocationMap,
+        final Map<Integer, String> responseMap,
+        final CountDownLatch countDownLatch,
+        final boolean failOnError) {
+      super(requestType, requestId, targetNode, nodeLocationMap, responseMap, 
countDownLatch);
+      this.failOnError = failOnError;
+    }
+
+    private final boolean failOnError;
+
+    @Override
+    protected String generateFormattedTargetLocation(final TestNodeLocation 
location) {
+      return location.endPoint.toString();
+    }
+
+    @Override
+    public void onComplete(final String response) {
+      responseMap.put(requestId, response);
+      nodeLocationMap.remove(requestId);
+      countDownLatch.countDown();
+    }
+
+    @Override
+    public void onError(final Exception exception) {
+      if (failOnError) {
+        throw new RuntimeException("handler failed");
+      }
+      responseMap.put(requestId, exception.getMessage());
+      countDownLatch.countDown();
+    }
+  }
+}

Reply via email to