This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1df19137839 Fix pipe runtime meta push blocking user operations
(#17909)
1df19137839 is described below
commit 1df19137839b3e813f1441071469c0a65afe2844
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 09:38:22 2026 +0800
Fix pipe runtime meta push blocking user operations (#17909)
* Fix pipe runtime meta push blocking user operations
* Fix subscription runtime push blocking
* Harden runtime meta sync best effort
* Bound required metadata push waits
* Fix async request dispatch failure hang
* Harden async request failure handling
* Use safe time conversion for runtime meta sync intervals
* Harden subscription runtime meta sync timeouts
* Bound pipe heartbeat metadata collection waits
* Harden best-effort runtime metadata requests
* Fix heartbeat client pool
* Add tests and use real fail message & status
---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 12 +-
.../java/org/apache/iotdb/rpc/RpcUtilsTest.java | 25 ++
.../async/AsyncAINodeHeartbeatClientPool.java | 12 +-
.../async/AsyncConfigNodeHeartbeatClientPool.java | 12 +-
.../async/AsyncDataNodeHeartbeatClientPool.java | 22 +-
.../rpc/CheckTimeSeriesExistenceRPCHandler.java | 2 +-
.../rpc/CountPathsUsingTemplateRPCHandler.java | 2 +-
.../rpc/FetchSchemaBlackListRPCHandler.java | 2 +-
.../async/handlers/rpc/SchemaUpdateRPCHandler.java | 2 +-
.../CheckSchemaRegionUsingTemplateRPCHandler.java | 2 +-
.../manager/load/service/TopologyService.java | 15 +-
.../runtime/heartbeat/PipeHeartbeatScheduler.java | 13 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 332 +++++++++++++++++----
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 22 +-
.../runtime/PipeHandleLeaderChangeProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 2 +-
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 6 +-
.../AbstractOperateSubscriptionProcedure.java | 20 ++
.../runtime/CommitProgressSyncProcedure.java | 10 +-
.../runtime/ConsumerGroupMetaSyncProcedure.java | 10 +-
.../SubscriptionHandleLeaderChangeProcedure.java | 7 +-
.../topic/runtime/TopicMetaSyncProcedure.java | 9 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 13 +-
.../GeneralRegionAttributeSecurityService.java | 67 ++---
.../client/request/AsyncRequestManager.java | 19 +-
.../client/request/AsyncRequestManagerTest.java | 226 ++++++++++++++
26 files changed, 728 insertions(+), 138 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 1eb30967405..9c88bf3f8b7 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -151,8 +151,7 @@ public class RpcUtils {
}
public static void verifySuccess(List<TSStatus> statuses) throws
BatchExecutionException {
- StringBuilder errMsgs =
- new
StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(":
");
+ StringBuilder errMsgs = new StringBuilder();
for (TSStatus status : statuses) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -160,7 +159,8 @@ public class RpcUtils {
}
}
if (errMsgs.length() > 0) {
- throw new BatchExecutionException(statuses, errMsgs.toString());
+ throw new BatchExecutionException(
+ statuses, TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ": " +
errMsgs);
}
}
@@ -181,9 +181,9 @@ public class RpcUtils {
for (TSStatus subStatus : statusList) {
if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& subStatus.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- if (!msgSet.contains(status)) {
- errMsg.append(status).append("; ");
- msgSet.add(status);
+ if (!msgSet.contains(subStatus)) {
+ errMsg.append(subStatus).append("; ");
+ msgSet.add(subStatus);
}
}
}
diff --git
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
index a6980276074..940c50453a2 100644
---
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
+++
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
@@ -19,11 +19,15 @@
package org.apache.iotdb.rpc;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+
import org.junit.Assert;
import org.junit.Test;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
public class RpcUtilsTest {
@@ -74,4 +78,25 @@ public class RpcUtilsTest {
Assert.assertFalse(RpcUtils.isSetSqlDialect("setsql_dialect =table"));
Assert.assertFalse(RpcUtils.isSetSqlDialect("set sql_dia"));
}
+
+ @Test
+ public void testVerifySuccessListAllowsSuccessfulStatuses() throws
BatchExecutionException {
+ RpcUtils.verifySuccess(
+ Arrays.asList(
+ RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+ RpcUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)));
+ }
+
+ @Test
+ public void testVerifySuccessListThrowsOnFailure() {
+ TSStatus failedStatus =
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "failed");
+
+ try {
+ RpcUtils.verifySuccess(Collections.singletonList(failedStatus));
+ Assert.fail("Expected BatchExecutionException");
+ } catch (BatchExecutionException e) {
+ Assert.assertEquals(Collections.singletonList(failedStatus),
e.getStatusList());
+ Assert.assertTrue(e.getMessage().contains("failed"));
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index d15b45cbcd5..5c4b4c3175e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -54,8 +54,8 @@ public class AsyncAINodeHeartbeatClientPool {
client = clientManager.borrowClient(endPoint);
client.getAIHeartbeat(req, handler);
dispatched = true;
- } catch (Exception ignore) {
- // Just ignore
+ } catch (Exception e) {
+ handleError(handler, e);
} finally {
// After the async call is dispatched, the client's onComplete/onError
callback is
// responsible for returning the client. If the RPC was not dispatched
(exception
@@ -67,6 +67,14 @@ public class AsyncAINodeHeartbeatClientPool {
}
}
+ private void handleError(final AINodeHeartbeatHandler handler, final
Exception e) {
+ try {
+ handler.onError(e);
+ } catch (final Exception ignore) {
+ // Ignore handler failures in heartbeat best-effort path.
+ }
+ }
+
private static class AsyncAINodeHeartbeatClientPoolHolder {
private static final AsyncAINodeHeartbeatClientPool INSTANCE =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 5cf716d0dd1..ab30935753b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -55,8 +55,8 @@ public class AsyncConfigNodeHeartbeatClientPool {
client = clientManager.borrowClient(endPoint);
client.getConfigNodeHeartBeat(heartbeatReq, handler);
dispatched = true;
- } catch (Exception ignore) {
- // Just ignore
+ } catch (Exception e) {
+ handleError(handler, e);
} finally {
// After the async call is dispatched, the client's onComplete/onError
callback is
// responsible for returning the client. If the RPC was not dispatched
(exception
@@ -68,6 +68,14 @@ public class AsyncConfigNodeHeartbeatClientPool {
}
}
+ private void handleError(final ConfigNodeHeartbeatHandler handler, final
Exception e) {
+ try {
+ handler.onError(e);
+ } catch (final Exception ignore) {
+ // Ignore handler failures in heartbeat best-effort path.
+ }
+ }
+
private static class AsyncConfigNodeHeartbeatClientPoolHolder {
private static final AsyncConfigNodeHeartbeatClientPool INSTANCE =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index d31ec405e0c..516596a61b8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -56,8 +56,8 @@ public class AsyncDataNodeHeartbeatClientPool {
client = clientManager.borrowClient(endPoint);
client.getDataNodeHeartBeat(req, handler);
dispatched = true;
- } catch (Exception ignore) {
- // Just ignore
+ } catch (Exception e) {
+ handleError(handler, e);
} finally {
returnClientIfNotDispatched(endPoint, client, dispatched);
}
@@ -72,7 +72,7 @@ public class AsyncDataNodeHeartbeatClientPool {
client.writeAuditLog(req, handler);
dispatched = true;
} catch (Exception e) {
- // Just ignore
+ handleError(handler, e);
} finally {
returnClientIfNotDispatched(endPoint, client, dispatched);
}
@@ -89,6 +89,22 @@ public class AsyncDataNodeHeartbeatClientPool {
}
}
+ private void handleError(final DataNodeHeartbeatHandler handler, final
Exception e) {
+ try {
+ handler.onError(e);
+ } catch (final Exception ignore) {
+ // Ignore handler failures in heartbeat best-effort path.
+ }
+ }
+
+ private void handleError(final DataNodeWriteAuditLogHandler handler, final
Exception e) {
+ try {
+ handler.onError(e);
+ } catch (final Exception ignore) {
+ // Ignore handler failures in audit-log best-effort path.
+ }
+ }
+
private static class AsyncDataNodeHeartbeatClientPoolHolder {
private static final AsyncDataNodeHeartbeatClientPool INSTANCE =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
index 3a1c47e1746..918d206fc37 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
@@ -83,11 +83,11 @@ public class CheckTimeSeriesExistenceRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
index 45574e131e7..ea2b342f597 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
@@ -83,11 +83,11 @@ public class CountPathsUsingTemplateRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
index 44f8362585c..a0f8957bbb4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
@@ -83,11 +83,11 @@ public class FetchSchemaBlackListRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
index 84d848a484e..ea1de3b6274 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
@@ -77,10 +77,10 @@ public class SchemaUpdateRPCHandler extends
DataNodeTSStatusRPCHandler {
+ e.getMessage();
LOGGER.warn(errorMsg);
- countDownLatch.countDown();
responseMap.put(
requestId,
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
index 6d3d1bf5007..e2b1d7bc836 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
@@ -85,11 +85,11 @@ public class CheckSchemaRegionUsingTemplateRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TCheckSchemaRegionUsingTemplateResp resp = new
TCheckSchemaRegionUsingTemplateResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
index 6d334521ded..b99cbd00017 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.tsfile.utils.Pair;
@@ -72,6 +73,7 @@ import java.util.stream.Collectors;
public class TopologyService implements Runnable, IClusterStatusSubscriber {
private static final Logger LOGGER =
LoggerFactory.getLogger(TopologyService.class);
private static final int SAMPLING_WINDOW_SIZE = 100;
+ private static final int TOPOLOGY_PROBING_RETRY_NUM = 1;
private final ExecutorService topologyThread =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -222,7 +224,7 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
nodeLocations,
proberLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout);
+ .sendAsyncRequest(dataNodeAsyncRequestContext,
TOPOLOGY_PROBING_RETRY_NUM, timeout, true);
final List<TTestConnectionResult> results = new ArrayList<>();
dataNodeAsyncRequestContext
.getResponseMap()
@@ -360,15 +362,18 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
}
CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(context,
CONF.getTopologyProbingBaseIntervalInMs());
+ .sendAsyncRequest(
+ context, TOPOLOGY_PROBING_RETRY_NUM,
CONF.getTopologyProbingBaseIntervalInMs(), true);
context
.getResponseMap()
.forEach(
(nodeId, resp) -> {
- Set<Integer> reachableSet =
- computedTopology.getOrDefault(nodeId,
Collections.emptySet());
- lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
+ if (resp.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Set<Integer> reachableSet =
+ computedTopology.getOrDefault(nodeId,
Collections.emptySet());
+ lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
+ }
});
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 91d36f4e441..b9276880cea 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -52,6 +52,7 @@ public class PipeHeartbeatScheduler {
PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
private static final long HEARTBEAT_INTERVAL_SECONDS =
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
+ private static final int PIPE_HEARTBEAT_RETRY_NUM = 1;
private static final ScheduledExecutorService HEARTBEAT_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -102,12 +103,8 @@ public class PipeHeartbeatScheduler {
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_HEARTBEAT, request,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
-
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
- * 1000L
- * 2
- / 3);
+ .sendAsyncRequest(
+ clientHandler, PIPE_HEARTBEAT_RETRY_NUM,
getPipeHeartbeatRequestTimeoutInMs(), true);
clientHandler
.getResponseMap()
.forEach(
@@ -137,6 +134,10 @@ public class PipeHeartbeatScheduler {
}
}
+ private static long getPipeHeartbeatRequestTimeoutInMs() {
+ return TimeUnit.SECONDS.toMillis(HEARTBEAT_INTERVAL_SECONDS) * 2 / 3;
+ }
+
public synchronized void stop() {
if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
heartbeatFuture.cancel(false);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 4105b8ecf5c..ab4f367df92 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
@@ -110,6 +111,8 @@ public class ConfigNodeProcedureEnv {
private static final Logger LOG =
LoggerFactory.getLogger(ConfigNodeProcedureEnv.class);
+ private static final int RUNTIME_META_PUSH_RETRY_NUM = 1;
+
/** Add or remove node lock. */
private final LockQueue nodeLock = new LockQueue();
@@ -655,10 +658,23 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp>
clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushPipeMetaResp> pushAllPipeMetaToDataNodesBestEffort(
+ List<ByteBuffer> pipeMetaBinaryList) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushPipeMetaReq request = new
TPushPipeMetaReq().setPipeMetas(pipeMetaBinaryList);
+
+ final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp>
clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request,
dataNodeLocationMap);
+ final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -670,10 +686,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -686,10 +701,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -703,10 +717,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -719,10 +732,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -735,10 +747,23 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp>
clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushTopicMetaResp>
pushAllTopicMetaToDataNodesBestEffort(
+ List<ByteBuffer> topicMetaBinaryList) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushTopicMetaReq request = new
TPushTopicMetaReq().setTopicMetas(topicMetaBinaryList);
+
+ final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp>
clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request,
dataNodeLocationMap);
+ final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -750,7 +775,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushTopicMetaResp::getStatus)
.collect(Collectors.toList());
@@ -765,7 +792,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushTopicMetaResp::getStatus)
.collect(Collectors.toList());
@@ -781,10 +810,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -797,10 +825,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -815,10 +842,25 @@ public class ConfigNodeProcedureEnv {
clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushConsumerGroupMetaResp>
pushAllConsumerGroupMetaToDataNodesBestEffort(
+ List<ByteBuffer> consumerGroupMetaBinaryList) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushConsumerGroupMetaReq request =
+ new
TPushConsumerGroupMetaReq().setConsumerGroupMetas(consumerGroupMetaBinaryList);
+
+ final DataNodeAsyncRequestContext<TPushConsumerGroupMetaReq,
TPushConsumerGroupMetaResp>
+ clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request,
dataNodeLocationMap);
+ final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -834,7 +876,9 @@ public class ConfigNodeProcedureEnv {
CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushConsumerGroupMetaResp::getStatus)
.collect(Collectors.toList());
@@ -852,7 +896,9 @@ public class ConfigNodeProcedureEnv {
CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushConsumerGroupMetaResp::getStatus)
.collect(Collectors.toList());
@@ -867,10 +913,23 @@ public class ConfigNodeProcedureEnv {
clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PULL_COMMIT_PROGRESS, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPullCommitProgressResponses(clientHandler, timeoutInMs);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPullCommitProgressResp>
pullCommitProgressFromDataNodesBestEffort() {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPullCommitProgressReq request = new TPullCommitProgressReq();
+
+ final DataNodeAsyncRequestContext<TPullCommitProgressReq,
TPullCommitProgressResp>
+ clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.PULL_COMMIT_PROGRESS, request,
dataNodeLocationMap);
+ final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+ fillMissingPullCommitProgressResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -884,8 +943,12 @@ public class ConfigNodeProcedureEnv {
final Set<Integer> readableDataNodeIds =
getLoadManager().filterDataNodeThroughStatus(NodeStatus::isReadable).stream()
.collect(Collectors.toSet());
- final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq, TSStatus>
clientHandler =
- new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME);
+ final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq, TSStatus>
+ readableDataNodeClientHandler =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME);
+ final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq, TSStatus>
+ unreadableDataNodeClientHandler =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME);
dataNodeLocationMap.forEach(
(dataNodeId, dataNodeLocation) -> {
@@ -919,16 +982,25 @@ public class ConfigNodeProcedureEnv {
preferredWriterNodeId == dataNodeId,
new ArrayList<>(activeWriterNodeIds)));
});
+ final DataNodeAsyncRequestContext<TPushSubscriptionRuntimeReq,
TSStatus> clientHandler =
+ readableDataNodeIds.contains(dataNodeId)
+ ? readableDataNodeClientHandler
+ : unreadableDataNodeClientHandler;
clientHandler.putNodeLocation(dataNodeId, dataNodeLocation);
clientHandler.putRequest(
dataNodeId, new
TPushSubscriptionRuntimeReq().setRuntimeStates(runtimeStates));
});
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
- return clientHandler.getResponseMap();
+ final long readableTimeoutInMs =
sendRequiredRuntimeMetaRequest(readableDataNodeClientHandler);
+ fillMissingSubscriptionRuntimeResponses(readableDataNodeClientHandler,
readableTimeoutInMs);
+ final long unreadableTimeoutInMs =
+ sendBestEffortRuntimeMetaRequest(unreadableDataNodeClientHandler);
+ fillMissingSubscriptionRuntimeResponses(unreadableDataNodeClientHandler,
unreadableTimeoutInMs);
+
+ final Map<Integer, TSStatus> responseMap =
+ new HashMap<>(readableDataNodeClientHandler.getResponseMap());
+ responseMap.putAll(unreadableDataNodeClientHandler.getResponseMap());
+ return responseMap;
}
private boolean isRuntimeActiveWriterNode(final int dataNodeId) {
@@ -937,6 +1009,164 @@ public class ConfigNodeProcedureEnv {
&& getLoadManager().getNodeStatus(dataNodeId) != NodeStatus.Removing;
}
+ private static long sendBestEffortRuntimeMetaRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler) {
+ final long timeoutInMs = getRuntimeMetaPushTimeoutInMs();
+ sendRuntimeMetaRequest(clientHandler, true, timeoutInMs);
+ return timeoutInMs;
+ }
+
+ private static long sendRequiredRuntimeMetaRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler) {
+ final long timeoutInMs = getRuntimeMetaPushTimeoutInMs();
+ sendRuntimeMetaRequest(clientHandler, false, timeoutInMs);
+ return timeoutInMs;
+ }
+
+ private static void sendRequiredMetadataRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler, final long
timeoutInMs) {
+ sendRuntimeMetaRequest(clientHandler, false, timeoutInMs);
+ }
+
+ private static void sendRuntimeMetaRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler, final boolean
keepSilent) {
+ sendRuntimeMetaRequest(clientHandler, keepSilent,
getRuntimeMetaPushTimeoutInMs());
+ }
+
+ private static void sendRuntimeMetaRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler,
+ final boolean keepSilent,
+ final long timeoutInMs) {
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(clientHandler, RUNTIME_META_PUSH_RETRY_NUM,
timeoutInMs, keepSilent);
+ }
+
+ private static void fillMissingPipePushMetaResponses(
+ final DataNodeAsyncRequestContext<?, TPushPipeMetaResp> clientHandler,
+ final long timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ new TPushPipeMetaResp(
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.PIPE_PUSH_META_ERROR,
+ timeoutInMs))));
+ }
+
+ private static void fillMissingTopicPushMetaResponses(
+ final DataNodeAsyncRequestContext<?, TPushTopicMetaResp> clientHandler,
+ final long timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ new TPushTopicMetaResp(
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.TOPIC_PUSH_META_ERROR,
+ timeoutInMs))));
+ }
+
+ private static void fillMissingConsumerGroupPushMetaResponses(
+ final DataNodeAsyncRequestContext<?, TPushConsumerGroupMetaResp>
clientHandler,
+ final long timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ new TPushConsumerGroupMetaResp(
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.CONSUMER_PUSH_META_ERROR,
+ timeoutInMs))));
+ }
+
+ private static void fillMissingPullCommitProgressResponses(
+ final DataNodeAsyncRequestContext<?, TPullCommitProgressResp>
clientHandler,
+ final long timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ new TPullCommitProgressResp(
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ timeoutInMs))));
+ }
+
+ private static void fillMissingSubscriptionRuntimeResponses(
+ final DataNodeAsyncRequestContext<?, TSStatus> clientHandler, final long
timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ timeoutInMs)));
+ }
+
+ private static TSStatus createMetadataTimeoutStatus(
+ final CnToDnAsyncRequestType requestType,
+ final int dataNodeId,
+ final TSStatusCode statusCode,
+ final long timeoutInMs) {
+ return new TSStatus(statusCode.getStatusCode())
+ .setMessage(
+ String.format(
+ "Failed to %s on DataNode %s before metadata request timeout
%sms",
+ requestType, dataNodeId, timeoutInMs));
+ }
+
+ private static long getRuntimeMetaPushTimeoutInMs() {
+ return TimeUnit.SECONDS.toMillis(
+
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta())
+ * 2
+ / 3;
+ }
+
+ private static long getRequiredPipeMetadataRequestTimeoutInMs() {
+ return TimeUnit.MINUTES.toMillis(
+ PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+ * 2
+ / 3;
+ }
+
+ private static long getRequiredSubscriptionMetadataRequestTimeoutInMs() {
+ return TimeUnit.MINUTES.toMillis(
+
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+ * 2
+ / 3;
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 5679b36ffc5..0e72737f265 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -487,10 +487,11 @@ public abstract class AbstractOperatePipeProcedureV2
if (resp.getStatus().getCode() ==
TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) {
if (!resp.isSetExceptionMessages()) {
+ final String statusMessage = resp.getStatus().getMessage();
exceptionMessageBuilder.append(
String.format(
- "DataNodeId: %s, Message: Internal error while processing
pushPipeMeta on dataNodes.",
- dataNodeId));
+ "DataNodeId: %s, Message: Internal error while processing
pushPipeMeta on dataNodes.%s",
+ dataNodeId, statusMessage == null ? "" : " " +
statusMessage));
continue;
}
@@ -538,6 +539,23 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
+ protected Map<Integer, TPushPipeMetaResp>
pushPipeMetaToDataNodesBestEffortAndGetResponse(
+ ConfigNodeProcedureEnv env) throws IOException {
+ final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+ for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
+
pipeMetaBinaryList.add(copyAndFilterOutNonWorkingDataRegionPipeTasks(pipeMeta).serialize());
+ }
+ return env.pushAllPipeMetaToDataNodesBestEffort(pipeMetaBinaryList);
+ }
+
+ protected void pushPipeMetaToDataNodesBestEffort(ConfigNodeProcedureEnv env)
{
+ try {
+ pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
+ } catch (Exception e) {
+
LOGGER.info(ProcedureMessages.FAILED_TO_PUSH_PIPE_META_LIST_TO_DATA_NODES_WILL,
e);
+ }
+ }
+
/**
* Pushing one pipeMeta to all the dataNodes, forcing an update to the
pipe's runtime state.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 93653923375..a8399b281b4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -124,7 +124,7 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
LOGGER::info,
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
- pushPipeMetaToDataNodesIgnoreException(env);
+ pushPipeMetaToDataNodesBestEffort(env);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 394f7e2fa9f..08468f3d04d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -140,7 +140,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
return;
}
- pushPipeMetaToDataNodesIgnoreException(env);
+ pushPipeMetaToDataNodesBestEffort(env);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 072555146c1..d371b7e9b20 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -52,6 +52,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -65,7 +66,8 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
private static final long MIN_EXECUTION_INTERVAL_MS =
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+
TimeUnit.MINUTES.toMillis(PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+ / 2;
// No need to serialize this field
private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
@@ -205,7 +207,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.debug(ProcedureMessages.PIPEMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES);
- Map<Integer, TPushPipeMetaResp> respMap = pushPipeMetaToDataNodes(env);
+ Map<Integer, TPushPipeMetaResp> respMap =
pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) {
throw new PipeException(
String.format(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 9b0bd21750d..d6fa0de5d20 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -350,6 +350,16 @@ public abstract class AbstractOperateSubscriptionProcedure
return env.pushAllTopicMetaToDataNodes(topicMetaBinaryList);
}
+ protected Map<Integer, TPushTopicMetaResp>
pushTopicMetaToDataNodesBestEffort(
+ ConfigNodeProcedureEnv env) throws IOException {
+ final List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
+ for (TopicMeta topicMeta : subscriptionInfo.get().getAllTopicMeta()) {
+ topicMetaBinaryList.add(topicMeta.serialize());
+ }
+
+ return env.pushAllTopicMetaToDataNodesBestEffort(topicMetaBinaryList);
+ }
+
public static boolean pushTopicMetaHasException(Map<Integer,
TPushTopicMetaResp> respMap) {
for (TPushTopicMetaResp resp : respMap.values()) {
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -376,6 +386,16 @@ public abstract class AbstractOperateSubscriptionProcedure
return
env.pushAllConsumerGroupMetaToDataNodes(consumerGroupMetaBinaryList);
}
+ protected Map<Integer, TPushConsumerGroupMetaResp>
pushConsumerGroupMetaToDataNodesBestEffort(
+ ConfigNodeProcedureEnv env) throws IOException {
+ final List<ByteBuffer> consumerGroupMetaBinaryList = new ArrayList<>();
+ for (ConsumerGroupMeta consumerGroupMeta :
subscriptionInfo.get().getAllConsumerGroupMeta()) {
+ consumerGroupMetaBinaryList.add(consumerGroupMeta.serialize());
+ }
+
+ return
env.pushAllConsumerGroupMetaToDataNodesBestEffort(consumerGroupMetaBinaryList);
+ }
+
public static boolean pushConsumerGroupMetaHasException(
Map<Integer, TPushConsumerGroupMetaResp> respMap) {
for (TPushConsumerGroupMetaResp resp : respMap.values()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
index e9b3056e662..348412af106 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/CommitProgressSyncProcedure.java
@@ -20,7 +20,7 @@
package
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.CommitProgressHandleMetaChangePlan;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -47,6 +47,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -59,7 +60,9 @@ public class CommitProgressSyncProcedure extends
AbstractOperateSubscriptionProc
private static final Logger LOGGER =
LoggerFactory.getLogger(CommitProgressSyncProcedure.class);
private static final long MIN_EXECUTION_INTERVAL_MS =
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+ TimeUnit.MINUTES.toMillis(
+
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+ / 2;
private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
public CommitProgressSyncProcedure() {
@@ -106,7 +109,8 @@ public class CommitProgressSyncProcedure extends
AbstractOperateSubscriptionProc
LOGGER.info("CommitProgressSyncProcedure:
executeFromOperateOnConfigNodes");
// 1. Pull commit progress from all DataNodes
- final Map<Integer, TPullCommitProgressResp> respMap =
env.pullCommitProgressFromDataNodes();
+ final Map<Integer, TPullCommitProgressResp> respMap =
+ env.pullCommitProgressFromDataNodesBestEffort();
// 2. Merge all DataNode responses with existing progress using Math::max
final Map<String, RegionProgress> mergedRegionProgress =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 20a1173ac60..8e14b26e260 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -20,7 +20,7 @@
package
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -44,6 +44,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -53,7 +54,9 @@ public class ConsumerGroupMetaSyncProcedure extends
AbstractOperateSubscriptionP
LoggerFactory.getLogger(ConsumerGroupMetaSyncProcedure.class);
private static final long MIN_EXECUTION_INTERVAL_MS =
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+ TimeUnit.MINUTES.toMillis(
+
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+ / 2;
// No need to serialize this field
private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
@@ -129,7 +132,8 @@ public class ConsumerGroupMetaSyncProcedure extends
AbstractOperateSubscriptionP
throws SubscriptionException, IOException {
LOGGER.info(ProcedureMessages.CONSUMERGROUPMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES);
- Map<Integer, TPushConsumerGroupMetaResp> respMap =
pushConsumerGroupMetaToDataNodes(env);
+ Map<Integer, TPushConsumerGroupMetaResp> respMap =
+ pushConsumerGroupMetaToDataNodesBestEffort(env);
if (pushConsumerGroupMetaHasException(respMap)) {
throw new SubscriptionException(
String.format(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
index 5337b719d20..09414b0e3a8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java
@@ -104,7 +104,8 @@ public class SubscriptionHandleLeaderChangeProcedure
extends AbstractOperateSubs
throws SubscriptionException {
LOGGER.info("SubscriptionHandleLeaderChangeProcedure:
executeFromOperateOnConfigNodes");
- final Map<Integer, TPullCommitProgressResp> respMap =
env.pullCommitProgressFromDataNodes();
+ final Map<Integer, TPullCommitProgressResp> respMap =
+ env.pullCommitProgressFromDataNodesBestEffort();
final Map<String, RegionProgress> mergedRegionProgress =
deserializeRegionProgressMap(
subscriptionInfo.get().getCommitProgressKeeper().getAllRegionProgress());
@@ -158,7 +159,7 @@ public class SubscriptionHandleLeaderChangeProcedure
extends AbstractOperateSubs
throws SubscriptionException, IOException {
LOGGER.info("SubscriptionHandleLeaderChangeProcedure:
executeFromOperateOnDataNodes");
- final Map<Integer, TPushTopicMetaResp> topicRespMap =
pushTopicMetaToDataNodes(env);
+ final Map<Integer, TPushTopicMetaResp> topicRespMap =
pushTopicMetaToDataNodesBestEffort(env);
topicRespMap.forEach(
(dataNodeId, resp) -> {
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -170,7 +171,7 @@ public class SubscriptionHandleLeaderChangeProcedure
extends AbstractOperateSubs
});
final Map<Integer, TPushConsumerGroupMetaResp> consumerGroupRespMap =
- pushConsumerGroupMetaToDataNodes(env);
+ pushConsumerGroupMetaToDataNodesBestEffort(env);
consumerGroupRespMap.forEach(
(dataNodeId, resp) -> {
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index b7432a2c588..d1ba988db63 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -44,6 +44,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -52,7 +53,9 @@ public class TopicMetaSyncProcedure extends
AbstractOperateSubscriptionProcedure
private static final Logger LOGGER =
LoggerFactory.getLogger(TopicMetaSyncProcedure.class);
private static final long MIN_EXECUTION_INTERVAL_MS =
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+ TimeUnit.MINUTES.toMillis(
+
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+ / 2;
// No need to serialize this field
private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
@@ -129,7 +132,7 @@ public class TopicMetaSyncProcedure extends
AbstractOperateSubscriptionProcedure
throws SubscriptionException, IOException {
LOGGER.info(ProcedureMessages.TOPICMETASYNCPROCEDURE_EXECUTEFROMOPERATEONDATANODES);
- Map<Integer, TPushTopicMetaResp> respMap = pushTopicMetaToDataNodes(env);
+ Map<Integer, TPushTopicMetaResp> respMap =
pushTopicMetaToDataNodesBestEffort(env);
if (pushTopicMetaHasException(respMap)) {
throw new SubscriptionException(
String.format(ProcedureMessages.FAILED_TO_PUSH_TOPIC_META_TO_DATANODES_DETAILS,
respMap));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2af869ab2f9..c588c94c7b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -435,6 +435,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private static final long TEST_CONNECTION_TIMEOUT_MS =
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+ private static final int TEST_CONNECTION_RETRY_NUM = 1;
private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR =
IoTDBThreadPoolFactory.newFixedThreadPool(
@@ -2222,7 +2223,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DnToCnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToCnRequestType,
TConfigNodeLocation> handler) ->
DnToCnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
private List<TTestConnectionResult>
testAllDataNodeConnectionInHeartbeatChannel(
@@ -2248,7 +2250,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
DnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -2261,7 +2264,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
DataNodeMPPServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -2274,7 +2278,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
DataNodeExternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
index db23436e520..4f668a19075 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
@@ -67,6 +67,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -77,6 +78,7 @@ public class GeneralRegionAttributeSecurityService extends
AbstractPeriodicalSer
LoggerFactory.getLogger(GeneralRegionAttributeSecurityService.class);
private static final IoTDBConfig iotdbConfig =
IoTDBDescriptor.getInstance().getConfig();
+ private static final int ATTRIBUTE_UPDATE_RETRY_NUM = 1;
private final Map<Integer, Pair<Long, Integer>>
dataNodeId2FailureDurationAndTimesMap =
new HashMap<>();
private final Set<ISchemaRegion> regionLeaders =
@@ -214,46 +216,42 @@ public class GeneralRegionAttributeSecurityService
extends AbstractPeriodicalSer
ByteBuffer.wrap(bytes)));
}));
- DnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(
- clientHandler,
+ final long timeoutInMs =
+ TimeUnit.SECONDS.toMillis(
IoTDBDescriptor.getInstance()
.getConfig()
.getGeneralRegionAttributeSecurityServiceTimeoutSeconds());
+ DnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(clientHandler, ATTRIBUTE_UPDATE_RETRY_NUM,
timeoutInMs);
final AtomicBoolean needFetch = new AtomicBoolean(false);
+ final Set<Integer> failedDataNodes = new
HashSet<>(clientHandler.getRequestIndices());
+ final long failureDurationToFetchInMs =
+ TimeUnit.SECONDS.toMillis(
+
iotdbConfig.getGeneralRegionAttributeSecurityServiceFailureDurationSecondsToFetch());
- final Set<Integer> failedDataNodes =
- clientHandler.getResponseMap().entrySet().stream()
- .filter(
- entry -> {
- final boolean failed =
- entry.getValue().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode();
- if (failed) {
- dataNodeId2FailureDurationAndTimesMap.compute(
- entry.getKey(),
- (k, v) -> {
- if (Objects.isNull(v)) {
- return new Pair<>(System.currentTimeMillis(), 1);
- }
- v.setRight(v.getRight() + 1);
- if (System.currentTimeMillis() - v.getLeft()
- >= iotdbConfig
-
.getGeneralRegionAttributeSecurityServiceFailureDurationSecondsToFetch()
- || v.getRight()
- >= iotdbConfig
-
.getGeneralRegionAttributeSecurityServiceFailureTimesToFetch()) {
- needFetch.set(true);
- }
- return v;
- });
- } else {
-
dataNodeId2FailureDurationAndTimesMap.remove(entry.getKey());
+ clientHandler.getResponseMap().entrySet().stream()
+ .filter(entry -> entry.getValue().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .map(Map.Entry::getKey)
+ .forEach(dataNodeId2FailureDurationAndTimesMap::remove);
+
+ failedDataNodes.forEach(
+ dataNodeId ->
+ dataNodeId2FailureDurationAndTimesMap.compute(
+ dataNodeId,
+ (k, v) -> {
+ if (Objects.isNull(v)) {
+ return new Pair<>(System.currentTimeMillis(), 1);
+ }
+ v.setRight(v.getRight() + 1);
+ if (System.currentTimeMillis() - v.getLeft() >=
failureDurationToFetchInMs
+ || v.getRight()
+ >= iotdbConfig
+
.getGeneralRegionAttributeSecurityServiceFailureTimesToFetch()) {
+ needFetch.set(true);
}
- return failed;
- })
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
+ return v;
+ }));
// Compute node shrinkage before failure removal in commit
final Map<SchemaRegionId, Set<TDataNodeLocation>> result =
@@ -301,7 +299,8 @@ public class GeneralRegionAttributeSecurityService extends
AbstractPeriodicalSer
super(
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.GENERAL_REGION_ATTRIBUTE_SECURITY_SERVICE.getName()),
- iotdbConfig.getGeneralRegionAttributeSecurityServiceIntervalSeconds()
* 1000L);
+ TimeUnit.SECONDS.toMillis(
+
iotdbConfig.getGeneralRegionAttributeSecurityServiceIntervalSeconds()));
}
private static final class GeneralRegionAttributeSecurityServiceHolder {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 1f147070acb..a00b653d28a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -182,6 +182,7 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
final TEndPoint endPoint = nodeLocationToEndPoint(targetNode);
Client client = null;
boolean dispatched = false;
+ AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler = null;
try {
if (!actionMap.containsKey(requestContext.getRequestType())) {
throw new UnsupportedOperationException(
@@ -189,11 +190,10 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
+ requestContext.getRequestType()
+ ", please set it in
AsyncRequestManager::initActionMapBuilder()");
}
+ handler = buildHandler(requestContext, requestId, targetNode);
client = clientManager.borrowClient(endPoint);
adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
Object req = requestContext.getRequest(requestId);
- AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
- buildHandler(requestContext, requestId, targetNode);
Objects.requireNonNull(actionMap.get(requestContext.getRequestType()))
.accept(req, client, handler);
// After accept() returns, the async callback (onComplete/onError) takes
over the
@@ -207,6 +207,21 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
endPoint,
e.getMessage(),
retryCount);
+ if (handler != null) {
+ try {
+ handler.onError(e);
+ } catch (final Exception handlerException) {
+ LOGGER.warn(
+ "Failed to handle async request error for request type {} on
node {}: {}",
+ requestContext.getRequestType(),
+ endPoint,
+ handlerException.getMessage(),
+ handlerException);
+ requestContext.getCountDownLatch().countDown();
+ }
+ } else {
+ requestContext.getCountDownLatch().countDown();
+ }
} finally {
if (!dispatched && client != null && clientManager instanceof
ClientManager) {
((ClientManager<TEndPoint, Client>)
clientManager).returnClient(endPoint, client);
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
new file mode 100644
index 00000000000..40b2bf9730b
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.request;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsyncRequestManagerTest {
+
+ @Test
+ public void dispatchFailureShouldNotBlockRetryWithoutTimeout() throws
Exception {
+ final TestAsyncRequestManager manager = new TestAsyncRequestManager();
+ final AsyncRequestContext<String, String, TestRequestType,
TestNodeLocation> context =
+ new AsyncRequestContext<>(
+ TestRequestType.TEST,
+ "request",
+ Collections.singletonMap(1, new TestNodeLocation(new
TEndPoint("localhost", 6667))));
+
+ final ExecutorService executorService =
Executors.newSingleThreadExecutor();
+ try {
+ final Future<?> future =
+ executorService.submit(() -> manager.sendAsyncRequest(context, 2,
null, true));
+ future.get(3, TimeUnit.SECONDS);
+ } finally {
+ executorService.shutdownNow();
+ }
+
+ Assert.assertEquals(2, manager.getBorrowAttempts());
+ Assert.assertEquals("borrow failed", context.getResponseMap().get(1));
+ Assert.assertEquals(Collections.singletonList(1),
context.getRequestIndices());
+ }
+
+ @Test
+ public void handlerErrorFailureShouldNotEscapeDispatchFailure() throws
Exception {
+ final TestAsyncRequestManager manager = new TestAsyncRequestManager(false,
true, true);
+ final AsyncRequestContext<String, String, TestRequestType,
TestNodeLocation> context =
+ new AsyncRequestContext<>(
+ TestRequestType.TEST,
+ "request",
+ Collections.singletonMap(1, new TestNodeLocation(new
TEndPoint("localhost", 6667))));
+
+ final ExecutorService executorService =
Executors.newSingleThreadExecutor();
+ try {
+ final Future<?> future =
+ executorService.submit(() -> manager.sendAsyncRequest(context, 1,
null, true));
+ future.get(3, TimeUnit.SECONDS);
+ } finally {
+ executorService.shutdownNow();
+ }
+
+ Assert.assertEquals(1, manager.getBorrowAttempts());
+ Assert.assertTrue(context.getResponseMap().isEmpty());
+ Assert.assertEquals(Collections.singletonList(1),
context.getRequestIndices());
+ }
+
+ private enum TestRequestType {
+ TEST
+ }
+
+ private static class TestNodeLocation {
+
+ private final TEndPoint endPoint;
+
+ private TestNodeLocation(final TEndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
+ }
+
+ private static class TestAsyncRequestManager
+ extends AsyncRequestManager<TestRequestType, TestNodeLocation, Object> {
+
+ private final AtomicInteger borrowAttempts = new AtomicInteger();
+ private boolean failOnBorrow;
+ private boolean failOnDispatch;
+ private boolean failOnError;
+
+ private TestAsyncRequestManager() {
+ this(true, false, false);
+ }
+
+ private TestAsyncRequestManager(
+ final boolean failOnBorrow, final boolean failOnDispatch, final
boolean failOnError) {
+ super(1);
+ this.failOnBorrow = failOnBorrow;
+ this.failOnDispatch = failOnDispatch;
+ this.failOnError = failOnError;
+ }
+
+ private int getBorrowAttempts() {
+ return borrowAttempts.get();
+ }
+
+ @Override
+ protected void initClientManager(final int
selectorNumOfAsyncClientManager) {
+ clientManager =
+ new IClientManager<TEndPoint, Object>() {
+
+ @Override
+ public Object borrowClient(final TEndPoint node) throws
ClientManagerException {
+ borrowAttempts.incrementAndGet();
+ if (failOnBorrow) {
+ throw new ClientManagerException("borrow failed");
+ }
+ return new Object();
+ }
+
+ @Override
+ public void clear(final TEndPoint node) {
+ // Do nothing
+ }
+
+ @Override
+ public void clearAll() {
+ // Do nothing
+ }
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
+ };
+ }
+
+ @Override
+ protected void initActionMapBuilder() {
+ actionMapBuilder.put(
+ TestRequestType.TEST,
+ (request, client, handler) -> {
+ if (failOnDispatch) {
+ throw new RuntimeException("dispatch failed");
+ }
+ Assert.fail("The test client manager should fail before
dispatch.");
+ });
+ }
+
+ @Override
+ protected TEndPoint nodeLocationToEndPoint(final TestNodeLocation
location) {
+ return location.endPoint;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected AsyncRequestRPCHandler<?, TestRequestType, TestNodeLocation>
buildHandler(
+ final AsyncRequestContext<?, ?, TestRequestType, TestNodeLocation>
requestContext,
+ final int requestId,
+ final TestNodeLocation targetNode) {
+ return new TestAsyncRequestRPCHandler(
+ requestContext.getRequestType(),
+ requestId,
+ targetNode,
+ requestContext.getNodeLocationMap(),
+ (Map<Integer, String>) requestContext.getResponseMap(),
+ requestContext.getCountDownLatch(),
+ failOnError);
+ }
+ }
+
+ private static class TestAsyncRequestRPCHandler
+ extends AsyncRequestRPCHandler<String, TestRequestType,
TestNodeLocation> {
+
+ private TestAsyncRequestRPCHandler(
+ final TestRequestType requestType,
+ final int requestId,
+ final TestNodeLocation targetNode,
+ final Map<Integer, TestNodeLocation> nodeLocationMap,
+ final Map<Integer, String> responseMap,
+ final CountDownLatch countDownLatch,
+ final boolean failOnError) {
+ super(requestType, requestId, targetNode, nodeLocationMap, responseMap,
countDownLatch);
+ this.failOnError = failOnError;
+ }
+
+ private final boolean failOnError;
+
+ @Override
+ protected String generateFormattedTargetLocation(final TestNodeLocation
location) {
+ return location.endPoint.toString();
+ }
+
+ @Override
+ public void onComplete(final String response) {
+ responseMap.put(requestId, response);
+ nodeLocationMap.remove(requestId);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(final Exception exception) {
+ if (failOnError) {
+ throw new RuntimeException("handler failed");
+ }
+ responseMap.put(requestId, exception.getMessage());
+ countDownLatch.countDown();
+ }
+ }
+}