This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new efdd472b7d1 Fix pipe runtime meta push blocking user operations
(#17909) (#17953)
efdd472b7d1 is described below
commit efdd472b7d103a05ea1796ac05feb0fffe35881f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:01:47 2026 +0800
Fix pipe runtime meta push blocking user operations (#17909) (#17953)
* Fix pipe runtime meta push blocking user operations
* Fix subscription runtime push blocking
* Harden runtime meta sync best effort
* Bound required metadata push waits
* Fix async request dispatch failure hang
* Harden async request failure handling
* Use safe time conversion for runtime meta sync intervals
* Harden subscription runtime meta sync timeouts
* Bound pipe heartbeat metadata collection waits
* Harden best-effort runtime metadata requests
* Fix heartbeat client pool
* Add tests and use real fail message & status
(cherry picked from commit 1df19137839b3e813f1441071469c0a65afe2844)
---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 12 +-
.../java/org/apache/iotdb/rpc/RpcUtilsTest.java | 25 +++
.../async/AsyncAINodeHeartbeatClientPool.java | 27 ++-
.../async/AsyncConfigNodeHeartbeatClientPool.java | 27 ++-
.../async/AsyncDataNodeHeartbeatClientPool.java | 33 ++-
.../rpc/CheckTimeSeriesExistenceRPCHandler.java | 2 +-
.../rpc/CountPathsUsingTemplateRPCHandler.java | 2 +-
.../rpc/FetchSchemaBlackListRPCHandler.java | 2 +-
.../async/handlers/rpc/SchemaUpdateRPCHandler.java | 2 +-
.../CheckSchemaRegionUsingTemplateRPCHandler.java | 2 +-
.../runtime/heartbeat/PipeHeartbeatScheduler.java | 13 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 236 +++++++++++++++++----
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 22 +-
.../runtime/PipeHandleLeaderChangeProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 2 +-
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 6 +-
.../AbstractOperateSubscriptionProcedure.java | 20 ++
.../runtime/ConsumerGroupMetaSyncProcedure.java | 10 +-
.../topic/runtime/TopicMetaSyncProcedure.java | 9 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 20 +-
.../client/request/AsyncRequestManager.java | 45 +++-
.../client/request/AsyncRequestManagerTest.java | 221 +++++++++++++++++++
22 files changed, 652 insertions(+), 88 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 5670f80f247..88da1ab8cb3 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -122,8 +122,7 @@ public class RpcUtils {
}
public static void verifySuccess(List<TSStatus> statuses) throws
BatchExecutionException {
- StringBuilder errMsgs =
- new
StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(":
");
+ StringBuilder errMsgs = new StringBuilder();
for (TSStatus status : statuses) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -131,7 +130,8 @@ public class RpcUtils {
}
}
if (errMsgs.length() > 0) {
- throw new BatchExecutionException(statuses, errMsgs.toString());
+ throw new BatchExecutionException(
+ statuses, TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ": " +
errMsgs);
}
}
@@ -152,9 +152,9 @@ public class RpcUtils {
for (TSStatus subStatus : statusList) {
if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& subStatus.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- if (!msgSet.contains(status)) {
- errMsg.append(status).append("; ");
- msgSet.add(status);
+ if (!msgSet.contains(subStatus)) {
+ errMsg.append(subStatus).append("; ");
+ msgSet.add(subStatus);
}
}
}
diff --git
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
index 12969e96904..b785ca83530 100644
---
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
+++
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/RpcUtilsTest.java
@@ -19,11 +19,15 @@
package org.apache.iotdb.rpc;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+
import org.junit.Assert;
import org.junit.Test;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.Collections;
public class RpcUtilsTest {
@@ -64,4 +68,25 @@ public class RpcUtilsTest {
"1970-01-01T07:59:59.999+08:00",
RpcUtils.parseLongToDateWithPrecision(formatter, -1, zoneId, "ms"));
}
+
+ @Test
+ public void testVerifySuccessListAllowsSuccessfulStatuses() throws
BatchExecutionException {
+ RpcUtils.verifySuccess(
+ Arrays.asList(
+ RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+ RpcUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)));
+ }
+
+ @Test
+ public void testVerifySuccessListThrowsOnFailure() {
+ TSStatus failedStatus =
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "failed");
+
+ try {
+ RpcUtils.verifySuccess(Collections.singletonList(failedStatus));
+ Assert.fail("Expected BatchExecutionException");
+ } catch (BatchExecutionException e) {
+ Assert.assertEquals(Collections.singletonList(failedStatus),
e.getStatusList());
+ Assert.assertTrue(e.getMessage().contains("failed"));
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index 8ec455142b8..fa1d9adcab3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient;
@@ -41,10 +42,30 @@ public class AsyncAINodeHeartbeatClientPool {
public void getAINodeHeartBeat(
TEndPoint endPoint, TAIHeartbeatReq req, AINodeHeartbeatHandler handler)
{
+ AsyncAINodeServiceClient client = null;
+ boolean dispatched = false;
try {
- clientManager.borrowClient(endPoint).getAIHeartbeat(req, handler);
- } catch (Exception ignore) {
- // Just ignore
+ client = clientManager.borrowClient(endPoint);
+ client.getAIHeartbeat(req, handler);
+ dispatched = true;
+ } catch (Exception e) {
+ handleError(handler, e);
+ } finally {
+ // After the async call is dispatched, the client's onComplete/onError
callback is
+ // responsible for returning the client. If the RPC was not dispatched
(exception
+ // before/during the call), the client must be returned here to prevent
pool leakage.
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, AsyncAINodeServiceClient>) clientManager)
+ .returnClient(endPoint, client);
+ }
+ }
+ }
+
+ private void handleError(final AINodeHeartbeatHandler handler, final
Exception e) {
+ try {
+ handler.onError(e);
+ } catch (final Exception ignore) {
+ // Ignore handler failures in heartbeat best-effort path.
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index a6dffbe0eef..ab30935753b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
@@ -48,10 +49,30 @@ public class AsyncConfigNodeHeartbeatClientPool {
TEndPoint endPoint,
TConfigNodeHeartbeatReq heartbeatReq,
ConfigNodeHeartbeatHandler handler) {
+ AsyncConfigNodeInternalServiceClient client = null;
+ boolean dispatched = false;
try {
-
clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(heartbeatReq,
handler);
- } catch (Exception ignore) {
- // Just ignore
+ client = clientManager.borrowClient(endPoint);
+ client.getConfigNodeHeartBeat(heartbeatReq, handler);
+ dispatched = true;
+ } catch (Exception e) {
+ handleError(handler, e);
+ } finally {
+ // After the async call is dispatched, the client's onComplete/onError
callback is
+ // responsible for returning the client. If the RPC was not dispatched
(exception
+ // before/during the call), the client must be returned here to prevent
pool leakage.
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>)
clientManager)
+ .returnClient(endPoint, client);
+ }
+ }
+ }
+
+ private void handleError(final ConfigNodeHeartbeatHandler handler, final
Exception e) {
+ try {
+ handler.onError(e);
+ } catch (final Exception ignore) {
+ // Ignore handler failures in heartbeat best-effort path.
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index 18a8120a9e4..60c43a0ae71 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -47,14 +48,38 @@ public class AsyncDataNodeHeartbeatClientPool {
*/
public void getDataNodeHeartBeat(
TEndPoint endPoint, TDataNodeHeartbeatReq req, DataNodeHeartbeatHandler
handler) {
+ AsyncDataNodeInternalServiceClient client = null;
+ boolean dispatched = false;
try {
- clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler);
- } catch (Exception ignore) {
- // Just ignore
+ client = clientManager.borrowClient(endPoint);
+ client.getDataNodeHeartBeat(req, handler);
+ dispatched = true;
+ } catch (Exception e) {
+ handleError(handler, e);
+ } finally {
+ returnClientIfNotDispatched(endPoint, client, dispatched);
+ }
+ }
+
+ // After the async call is dispatched, the client's onComplete/onError
callback is responsible
+ // for returning the client. If the RPC was not dispatched (exception
before/during the call),
+ // the client must be returned here to prevent pool leakage.
+ private void returnClientIfNotDispatched(
+ TEndPoint endPoint, AsyncDataNodeInternalServiceClient client, boolean
dispatched) {
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
clientManager)
+ .returnClient(endPoint, client);
+ }
+ }
+
+ private void handleError(final DataNodeHeartbeatHandler handler, final
Exception e) {
+ try {
+ handler.onError(e);
+ } catch (final Exception ignore) {
+ // Ignore handler failures in heartbeat best-effort path.
}
}
- // TODO: Is the AsyncDataNodeHeartbeatClientPool must be a singleton?
private static class AsyncDataNodeHeartbeatClientPoolHolder {
private static final AsyncDataNodeHeartbeatClientPool INSTANCE =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
index 3a735691a0e..05f480062cd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CheckTimeSeriesExistenceRPCHandler.java
@@ -75,11 +75,11 @@ public class CheckTimeSeriesExistenceRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
index b27c74bb41d..8089e2aa524 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/CountPathsUsingTemplateRPCHandler.java
@@ -75,11 +75,11 @@ public class CountPathsUsingTemplateRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
index 693017ec02d..07ce15fd0fb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/FetchSchemaBlackListRPCHandler.java
@@ -77,11 +77,11 @@ public class FetchSchemaBlackListRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
index dc2796a232e..0ca64f51fbb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/SchemaUpdateRPCHandler.java
@@ -75,10 +75,10 @@ public class SchemaUpdateRPCHandler extends
DataNodeTSStatusRPCHandler {
+ e.getMessage();
LOGGER.warn(errorMsg);
- countDownLatch.countDown();
responseMap.put(
requestId,
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
index 249e8b51767..1c2f77ef520 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/CheckSchemaRegionUsingTemplateRPCHandler.java
@@ -78,11 +78,11 @@ public class CheckSchemaRegionUsingTemplateRPCHandler
+ e.getMessage();
LOGGER.error(errorMsg);
- countDownLatch.countDown();
TCheckSchemaRegionUsingTemplateResp resp = new
TCheckSchemaRegionUsingTemplateResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
errorMsg)));
responseMap.put(requestId, resp);
+ countDownLatch.countDown();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 8f864b4d5c2..d4611bd2f2e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -50,6 +50,7 @@ public class PipeHeartbeatScheduler {
PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
private static final long HEARTBEAT_INTERVAL_SECONDS =
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
+ private static final int PIPE_HEARTBEAT_RETRY_NUM = 1;
private static final ScheduledExecutorService HEARTBEAT_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -99,12 +100,8 @@ public class PipeHeartbeatScheduler {
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_HEARTBEAT, request,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
-
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
- * 1000L
- * 2
- / 3);
+ .sendAsyncRequest(
+ clientHandler, PIPE_HEARTBEAT_RETRY_NUM,
getPipeHeartbeatRequestTimeoutInMs(), true);
clientHandler
.getResponseMap()
.forEach(
@@ -133,6 +130,10 @@ public class PipeHeartbeatScheduler {
}
}
+ private static long getPipeHeartbeatRequestTimeoutInMs() {
+ return TimeUnit.SECONDS.toMillis(HEARTBEAT_INTERVAL_SECONDS) * 2 / 3;
+ }
+
public synchronized void stop() {
if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
heartbeatFuture.cancel(false);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 6914f8aacf5..a96d12af11c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
@@ -107,6 +108,8 @@ public class ConfigNodeProcedureEnv {
private static final Logger LOG =
LoggerFactory.getLogger(ConfigNodeProcedureEnv.class);
+ private static final int RUNTIME_META_PUSH_RETRY_NUM = 1;
+
/** Add or remove node lock. */
private final LockQueue nodeLock = new LockQueue();
@@ -687,10 +690,23 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp>
clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushPipeMetaResp> pushAllPipeMetaToDataNodesBestEffort(
+ List<ByteBuffer> pipeMetaBinaryList) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushPipeMetaReq request = new
TPushPipeMetaReq().setPipeMetas(pipeMetaBinaryList);
+
+ final DataNodeAsyncRequestContext<TPushPipeMetaReq, TPushPipeMetaResp>
clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.PIPE_PUSH_ALL_META, request,
dataNodeLocationMap);
+ final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -702,10 +718,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -718,10 +733,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSinglePipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -735,10 +749,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -751,10 +764,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiPipeMetaReq,
TPushPipeMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs = getRequiredPipeMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingPipePushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -767,10 +779,23 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp>
clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushTopicMetaResp>
pushAllTopicMetaToDataNodesBestEffort(
+ List<ByteBuffer> topicMetaBinaryList) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushTopicMetaReq request = new
TPushTopicMetaReq().setTopicMetas(topicMetaBinaryList);
+
+ final DataNodeAsyncRequestContext<TPushTopicMetaReq, TPushTopicMetaResp>
clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META, request,
dataNodeLocationMap);
+ final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -782,7 +807,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushTopicMetaResp::getStatus)
.collect(Collectors.toList());
@@ -797,7 +824,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushSingleTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META, request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushTopicMetaResp::getStatus)
.collect(Collectors.toList());
@@ -813,10 +842,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -829,10 +857,9 @@ public class ConfigNodeProcedureEnv {
final DataNodeAsyncRequestContext<TPushMultiTopicMetaReq,
TPushTopicMetaResp> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingTopicPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -847,10 +874,25 @@ public class ConfigNodeProcedureEnv {
clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request,
dataNodeLocationMap);
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
- clientHandler,
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() *
60 * 1000 * 2 / 3);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
+ return clientHandler.getResponseMap();
+ }
+
+ public Map<Integer, TPushConsumerGroupMetaResp>
pushAllConsumerGroupMetaToDataNodesBestEffort(
+ List<ByteBuffer> consumerGroupMetaBinaryList) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TPushConsumerGroupMetaReq request =
+ new
TPushConsumerGroupMetaReq().setConsumerGroupMetas(consumerGroupMetaBinaryList);
+
+ final DataNodeAsyncRequestContext<TPushConsumerGroupMetaReq,
TPushConsumerGroupMetaResp>
+ clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META, request,
dataNodeLocationMap);
+ final long timeoutInMs = sendBestEffortRuntimeMetaRequest(clientHandler);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseMap();
}
@@ -866,7 +908,9 @@ public class ConfigNodeProcedureEnv {
CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushConsumerGroupMetaResp::getStatus)
.collect(Collectors.toList());
@@ -884,12 +928,124 @@ public class ConfigNodeProcedureEnv {
CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
request,
dataNodeLocationMap);
-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ final long timeoutInMs =
getRequiredSubscriptionMetadataRequestTimeoutInMs();
+ sendRequiredMetadataRequest(clientHandler, timeoutInMs);
+ fillMissingConsumerGroupPushMetaResponses(clientHandler, timeoutInMs);
return clientHandler.getResponseList().stream()
.map(TPushConsumerGroupMetaResp::getStatus)
.collect(Collectors.toList());
}
+ private static long sendBestEffortRuntimeMetaRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler) {
+ final long timeoutInMs = getRuntimeMetaPushTimeoutInMs();
+ sendRuntimeMetaRequest(clientHandler, true, timeoutInMs);
+ return timeoutInMs;
+ }
+
+ private static void sendRequiredMetadataRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler, final long
timeoutInMs) {
+ sendRuntimeMetaRequest(clientHandler, false, timeoutInMs);
+ }
+
+ private static void sendRuntimeMetaRequest(
+ final DataNodeAsyncRequestContext<?, ?> clientHandler,
+ final boolean keepSilent,
+ final long timeoutInMs) {
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(clientHandler, RUNTIME_META_PUSH_RETRY_NUM,
timeoutInMs, keepSilent);
+ }
+
+ private static void fillMissingPipePushMetaResponses(
+ final DataNodeAsyncRequestContext<?, TPushPipeMetaResp> clientHandler,
+ final long timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ new TPushPipeMetaResp(
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.PIPE_PUSH_META_ERROR,
+ timeoutInMs))));
+ }
+
+ private static void fillMissingTopicPushMetaResponses(
+ final DataNodeAsyncRequestContext<?, TPushTopicMetaResp> clientHandler,
+ final long timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ new TPushTopicMetaResp(
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.TOPIC_PUSH_META_ERROR,
+ timeoutInMs))));
+ }
+
+ private static void fillMissingConsumerGroupPushMetaResponses(
+ final DataNodeAsyncRequestContext<?, TPushConsumerGroupMetaResp>
clientHandler,
+ final long timeoutInMs) {
+ clientHandler
+ .getRequestIndices()
+ .forEach(
+ dataNodeId ->
+ clientHandler
+ .getResponseMap()
+ .putIfAbsent(
+ dataNodeId,
+ new TPushConsumerGroupMetaResp(
+ createMetadataTimeoutStatus(
+ clientHandler.getRequestType(),
+ dataNodeId,
+ TSStatusCode.CONSUMER_PUSH_META_ERROR,
+ timeoutInMs))));
+ }
+
+ private static TSStatus createMetadataTimeoutStatus(
+ final CnToDnAsyncRequestType requestType,
+ final int dataNodeId,
+ final TSStatusCode statusCode,
+ final long timeoutInMs) {
+ return new TSStatus(statusCode.getStatusCode())
+ .setMessage(
+ String.format(
+ "Failed to %s on DataNode %s before metadata request timeout
%sms",
+ requestType, dataNodeId, timeoutInMs));
+ }
+
+ private static long getRuntimeMetaPushTimeoutInMs() {
+ return TimeUnit.SECONDS.toMillis(
+
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta())
+ * 2
+ / 3;
+ }
+
+ private static long getRequiredPipeMetadataRequestTimeoutInMs() {
+ return TimeUnit.MINUTES.toMillis(
+ PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+ * 2
+ / 3;
+ }
+
+ private static long getRequiredSubscriptionMetadataRequestTimeoutInMs() {
+ return TimeUnit.MINUTES.toMillis(
+
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+ * 2
+ / 3;
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 857d5733f6a..2fa0091568f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -464,10 +464,11 @@ public abstract class AbstractOperatePipeProcedureV2
if (resp.getStatus().getCode() ==
TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()) {
if (!resp.isSetExceptionMessages()) {
+ final String statusMessage = resp.getStatus().getMessage();
exceptionMessageBuilder.append(
String.format(
- "DataNodeId: %s, Message: Internal error while processing
pushPipeMeta on dataNodes.",
- dataNodeId));
+ "DataNodeId: %s, Message: Internal error while processing
pushPipeMeta on dataNodes.%s",
+ dataNodeId, statusMessage == null ? "" : " " +
statusMessage));
continue;
}
@@ -515,6 +516,23 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
+ protected Map<Integer, TPushPipeMetaResp>
pushPipeMetaToDataNodesBestEffortAndGetResponse(
+ ConfigNodeProcedureEnv env) throws IOException {
+ final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+ for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
+ pipeMetaBinaryList.add(pipeMeta.serialize());
+ }
+ return env.pushAllPipeMetaToDataNodesBestEffort(pipeMetaBinaryList);
+ }
+
+ protected void pushPipeMetaToDataNodesBestEffort(ConfigNodeProcedureEnv env)
{
+ try {
+ pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
+ } catch (Exception e) {
+ LOGGER.info("Failed to push pipe meta list to data nodes, will retry
later.", e);
+ }
+ }
+
/**
* Pushing one pipeMeta to all the dataNodes, forcing an update to the
pipe's runtime state.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 61f6f3cae2a..300d04ee11b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -111,7 +111,7 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure:
executeFromHandleOnDataNodes");
- pushPipeMetaToDataNodesIgnoreException(env);
+ pushPipeMetaToDataNodesBestEffort(env);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 401859f0a7e..3089b3dd072 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -128,7 +128,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
return;
}
- pushPipeMetaToDataNodesIgnoreException(env);
+ pushPipeMetaToDataNodesBestEffort(env);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 282b540b3aa..9764a3af3a5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -50,7 +51,8 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeMetaSyncProcedure.class);
private static final long MIN_EXECUTION_INTERVAL_MS =
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+
TimeUnit.MINUTES.toMillis(PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes())
+ / 2;
// No need to serialize this field
private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
@@ -134,7 +136,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
throws PipeException, IOException {
LOGGER.debug("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
- Map<Integer, TPushPipeMetaResp> respMap = pushPipeMetaToDataNodes(env);
+ Map<Integer, TPushPipeMetaResp> respMap =
pushPipeMetaToDataNodesBestEffortAndGetResponse(env);
if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) {
throw new PipeException(
String.format(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 0b246ac4ef7..8f7c17f9ca7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -332,6 +332,16 @@ public abstract class AbstractOperateSubscriptionProcedure
return env.pushAllTopicMetaToDataNodes(topicMetaBinaryList);
}
+ protected Map<Integer, TPushTopicMetaResp>
pushTopicMetaToDataNodesBestEffort(
+ ConfigNodeProcedureEnv env) throws IOException {
+ final List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
+ for (TopicMeta topicMeta : subscriptionInfo.get().getAllTopicMeta()) {
+ topicMetaBinaryList.add(topicMeta.serialize());
+ }
+
+ return env.pushAllTopicMetaToDataNodesBestEffort(topicMetaBinaryList);
+ }
+
public static boolean pushTopicMetaHasException(Map<Integer,
TPushTopicMetaResp> respMap) {
for (TPushTopicMetaResp resp : respMap.values()) {
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -358,6 +368,16 @@ public abstract class AbstractOperateSubscriptionProcedure
return
env.pushAllConsumerGroupMetaToDataNodes(consumerGroupMetaBinaryList);
}
+ protected Map<Integer, TPushConsumerGroupMetaResp>
pushConsumerGroupMetaToDataNodesBestEffort(
+ ConfigNodeProcedureEnv env) throws IOException {
+ final List<ByteBuffer> consumerGroupMetaBinaryList = new ArrayList<>();
+ for (ConsumerGroupMeta consumerGroupMeta :
subscriptionInfo.get().getAllConsumerGroupMeta()) {
+ consumerGroupMetaBinaryList.add(consumerGroupMeta.serialize());
+ }
+
+ return
env.pushAllConsumerGroupMetaToDataNodesBestEffort(consumerGroupMetaBinaryList);
+ }
+
public static boolean pushConsumerGroupMetaHasException(
Map<Integer, TPushConsumerGroupMetaResp> respMap) {
for (TPushConsumerGroupMetaResp resp : respMap.values()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 93eb6c5a5fc..0f6b5b231d2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -20,7 +20,7 @@
package
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -51,7 +52,9 @@ public class ConsumerGroupMetaSyncProcedure extends
AbstractOperateSubscriptionP
LoggerFactory.getLogger(ConsumerGroupMetaSyncProcedure.class);
private static final long MIN_EXECUTION_INTERVAL_MS =
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+ TimeUnit.MINUTES.toMillis(
+
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+ / 2;
// No need to serialize this field
private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
@@ -127,7 +130,8 @@ public class ConsumerGroupMetaSyncProcedure extends
AbstractOperateSubscriptionP
throws SubscriptionException, IOException {
LOGGER.info("ConsumerGroupMetaSyncProcedure:
executeFromOperateOnDataNodes");
- Map<Integer, TPushConsumerGroupMetaResp> respMap =
pushConsumerGroupMetaToDataNodes(env);
+ Map<Integer, TPushConsumerGroupMetaResp> respMap =
+ pushConsumerGroupMetaToDataNodesBestEffort(env);
if (pushConsumerGroupMetaHasException(respMap)) {
throw new SubscriptionException(
String.format("Failed to push consumer group meta to dataNodes,
details: %s", respMap));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index 27919bbcb9e..da94ca0ec72 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -50,7 +51,9 @@ public class TopicMetaSyncProcedure extends
AbstractOperateSubscriptionProcedure
private static final Logger LOGGER =
LoggerFactory.getLogger(TopicMetaSyncProcedure.class);
private static final long MIN_EXECUTION_INTERVAL_MS =
- PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes() * 60 *
1000 / 2;
+ TimeUnit.MINUTES.toMillis(
+
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes())
+ / 2;
// No need to serialize this field
private static final AtomicLong LAST_EXECUTION_TIME = new AtomicLong(0);
@@ -126,7 +129,7 @@ public class TopicMetaSyncProcedure extends
AbstractOperateSubscriptionProcedure
throws SubscriptionException, IOException {
LOGGER.info("TopicMetaSyncProcedure: executeFromOperateOnDataNodes");
- Map<Integer, TPushTopicMetaResp> respMap = pushTopicMetaToDataNodes(env);
+ Map<Integer, TPushTopicMetaResp> respMap =
pushTopicMetaToDataNodesBestEffort(env);
if (pushTopicMetaHasException(respMap)) {
throw new SubscriptionException(
String.format("Failed to push topic meta to dataNodes, details: %s",
respMap));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f5c84a2375f..fcaef4baa30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -327,6 +327,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private final DataNodeThrottleQuotaManager throttleQuotaManager =
DataNodeThrottleQuotaManager.getInstance();
+ private static final long TEST_CONNECTION_TIMEOUT_MS =
+ CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+ private static final int TEST_CONNECTION_RETRY_NUM = 1;
+
private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
private final ExecutorService schemaExecutor =
@@ -1613,7 +1617,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.ConfigNodeInternalService,
DnToCnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToCnRequestType,
TConfigNodeLocation> handler) ->
-
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DnToCnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
@@ -1625,7 +1631,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeInternalService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -1637,7 +1645,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeMPPService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DataNodeMPPServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -1649,7 +1659,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeExternalService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DataNodeExternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(
+ handler, TEST_CONNECTION_RETRY_NUM,
TEST_CONNECTION_TIMEOUT_MS, true));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 0290d33d3a4..45a9d6f4817 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.client.request;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.utils.function.CheckedTriConsumer;
@@ -104,6 +105,14 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
AsyncRequestContext<?, ?, RequestType, NodeLocation> requestContext,
int retryNum,
Long timeoutInMs) {
+ sendAsyncRequest(requestContext, retryNum, timeoutInMs, false);
+ }
+
+ public void sendAsyncRequest(
+ AsyncRequestContext<?, ?, RequestType, NodeLocation> requestContext,
+ int retryNum,
+ Long timeoutInMs,
+ boolean keepSilent) {
if (requestContext.getRequestIndices().isEmpty()) {
return;
}
@@ -143,7 +152,7 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
}
}
- if (!requestContext.getRequestIndices().isEmpty()) {
+ if (!requestContext.getRequestIndices().isEmpty() && !keepSilent) {
LOGGER.warn(
"Failed to {} on ConfigNode after {} retries, requestIndices: {}",
requestType,
@@ -157,6 +166,10 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
int requestId,
NodeLocation targetNode,
int retryCount) {
+ final TEndPoint endPoint = nodeLocationToEndPoint(targetNode);
+ Client client = null;
+ boolean dispatched = false;
+ AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler = null;
try {
if (!actionMap.containsKey(requestContext.getRequestType())) {
throw new UnsupportedOperationException(
@@ -164,20 +177,42 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
+ requestContext.getRequestType()
+ ", please set it in
AsyncRequestManager::initActionMapBuilder()");
}
- Client client =
clientManager.borrowClient(nodeLocationToEndPoint(targetNode));
+ handler = buildHandler(requestContext, requestId, targetNode);
+ client = clientManager.borrowClient(endPoint);
adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
Object req = requestContext.getRequest(requestId);
- AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
- buildHandler(requestContext, requestId, targetNode);
Objects.requireNonNull(actionMap.get(requestContext.getRequestType()))
.accept(req, client, handler);
+ // After accept() returns, the async callback (onComplete/onError) takes
over the
+ // responsibility of returning the client to the pool. Before this
point, if any exception
+ // is thrown, the client must be returned/invalidated here to prevent
pool leakage.
+ dispatched = true;
} catch (Exception e) {
LOGGER.warn(
"{} failed on Node {}, because {}, retrying {}...",
requestContext.getRequestType(),
- nodeLocationToEndPoint(targetNode),
+ endPoint,
e.getMessage(),
retryCount);
+ if (handler != null) {
+ try {
+ handler.onError(e);
+ } catch (final Exception handlerException) {
+ LOGGER.warn(
+ "Failed to handle async request error for request type {} on
node {}: {}",
+ requestContext.getRequestType(),
+ endPoint,
+ handlerException.getMessage(),
+ handlerException);
+ requestContext.getCountDownLatch().countDown();
+ }
+ } else {
+ requestContext.getCountDownLatch().countDown();
+ }
+ } finally {
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, Client>)
clientManager).returnClient(endPoint, client);
+ }
}
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
new file mode 100644
index 00000000000..c01cc814b3d
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/request/AsyncRequestManagerTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.request;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsyncRequestManagerTest {
+
+ @Test
+ public void dispatchFailureShouldNotBlockRetryWithoutTimeout() throws
Exception {
+ final TestAsyncRequestManager manager = new TestAsyncRequestManager();
+ final AsyncRequestContext<String, String, TestRequestType,
TestNodeLocation> context =
+ new AsyncRequestContext<>(
+ TestRequestType.TEST,
+ "request",
+ Collections.singletonMap(1, new TestNodeLocation(new
TEndPoint("localhost", 6667))));
+
+ final ExecutorService executorService =
Executors.newSingleThreadExecutor();
+ try {
+ final Future<?> future =
+ executorService.submit(() -> manager.sendAsyncRequest(context, 2,
null, true));
+ future.get(3, TimeUnit.SECONDS);
+ } finally {
+ executorService.shutdownNow();
+ }
+
+ Assert.assertEquals(2, manager.getBorrowAttempts());
+ Assert.assertEquals("borrow failed", context.getResponseMap().get(1));
+ Assert.assertEquals(Collections.singletonList(1),
context.getRequestIndices());
+ }
+
+ @Test
+ public void handlerErrorFailureShouldNotEscapeDispatchFailure() throws
Exception {
+ final TestAsyncRequestManager manager = new TestAsyncRequestManager(false,
true, true);
+ final AsyncRequestContext<String, String, TestRequestType,
TestNodeLocation> context =
+ new AsyncRequestContext<>(
+ TestRequestType.TEST,
+ "request",
+ Collections.singletonMap(1, new TestNodeLocation(new
TEndPoint("localhost", 6667))));
+
+ final ExecutorService executorService =
Executors.newSingleThreadExecutor();
+ try {
+ final Future<?> future =
+ executorService.submit(() -> manager.sendAsyncRequest(context, 1,
null, true));
+ future.get(3, TimeUnit.SECONDS);
+ } finally {
+ executorService.shutdownNow();
+ }
+
+ Assert.assertEquals(1, manager.getBorrowAttempts());
+ Assert.assertTrue(context.getResponseMap().isEmpty());
+ Assert.assertEquals(Collections.singletonList(1),
context.getRequestIndices());
+ }
+
+ private enum TestRequestType {
+ TEST
+ }
+
+ private static class TestNodeLocation {
+
+ private final TEndPoint endPoint;
+
+ private TestNodeLocation(final TEndPoint endPoint) {
+ this.endPoint = endPoint;
+ }
+ }
+
+ private static class TestAsyncRequestManager
+ extends AsyncRequestManager<TestRequestType, TestNodeLocation, Object> {
+
+ private final AtomicInteger borrowAttempts = new AtomicInteger();
+ private boolean failOnBorrow;
+ private boolean failOnDispatch;
+ private boolean failOnError;
+
+ private TestAsyncRequestManager() {
+ this(true, false, false);
+ }
+
+ private TestAsyncRequestManager(
+ final boolean failOnBorrow, final boolean failOnDispatch, final
boolean failOnError) {
+ super(1);
+ this.failOnBorrow = failOnBorrow;
+ this.failOnDispatch = failOnDispatch;
+ this.failOnError = failOnError;
+ }
+
+ private int getBorrowAttempts() {
+ return borrowAttempts.get();
+ }
+
+ @Override
+ protected void initClientManager(final int
selectorNumOfAsyncClientManager) {
+ clientManager =
+ new IClientManager<TEndPoint, Object>() {
+
+ @Override
+ public Object borrowClient(final TEndPoint node) throws
ClientManagerException {
+ borrowAttempts.incrementAndGet();
+ if (failOnBorrow) {
+ throw new ClientManagerException("borrow failed");
+ }
+ return new Object();
+ }
+
+ @Override
+ public void clear(final TEndPoint node) {
+ // Do nothing
+ }
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
+ };
+ }
+
+ @Override
+ protected void initActionMapBuilder() {
+ actionMapBuilder.put(
+ TestRequestType.TEST,
+ (request, client, handler) -> {
+ if (failOnDispatch) {
+ throw new RuntimeException("dispatch failed");
+ }
+ Assert.fail("The test client manager should fail before
dispatch.");
+ });
+ }
+
+ @Override
+ protected TEndPoint nodeLocationToEndPoint(final TestNodeLocation
location) {
+ return location.endPoint;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected AsyncRequestRPCHandler<?, TestRequestType, TestNodeLocation>
buildHandler(
+ final AsyncRequestContext<?, ?, TestRequestType, TestNodeLocation>
requestContext,
+ final int requestId,
+ final TestNodeLocation targetNode) {
+ return new TestAsyncRequestRPCHandler(
+ requestContext.getRequestType(),
+ requestId,
+ targetNode,
+ requestContext.getNodeLocationMap(),
+ (Map<Integer, String>) requestContext.getResponseMap(),
+ requestContext.getCountDownLatch(),
+ failOnError);
+ }
+ }
+
+ private static class TestAsyncRequestRPCHandler
+ extends AsyncRequestRPCHandler<String, TestRequestType,
TestNodeLocation> {
+
+ private TestAsyncRequestRPCHandler(
+ final TestRequestType requestType,
+ final int requestId,
+ final TestNodeLocation targetNode,
+ final Map<Integer, TestNodeLocation> nodeLocationMap,
+ final Map<Integer, String> responseMap,
+ final CountDownLatch countDownLatch,
+ final boolean failOnError) {
+ super(requestType, requestId, targetNode, nodeLocationMap, responseMap,
countDownLatch);
+ this.failOnError = failOnError;
+ }
+
+ private final boolean failOnError;
+
+ @Override
+ protected String generateFormattedTargetLocation(final TestNodeLocation
location) {
+ return location.endPoint.toString();
+ }
+
+ @Override
+ public void onComplete(final String response) {
+ responseMap.put(requestId, response);
+ nodeLocationMap.remove(requestId);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(final Exception exception) {
+ if (failOnError) {
+ throw new RuntimeException("handler failed");
+ }
+ responseMap.put(requestId, exception.getMessage());
+ countDownLatch.countDown();
+ }
+ }
+}