This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 958ce45b137 Fix repeated RPC dispatch reusing a released
FragmentInstanceContext (NPE) (#17794)
958ce45b137 is described below
commit 958ce45b13722f04bbc3837ab76caad207e15216
Author: Jackie Tien <[email protected]>
AuthorDate: Mon Jun 1 10:39:38 2026 +0800
Fix repeated RPC dispatch reusing a released FragmentInstanceContext (NPE)
(#17794)
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../execution/executor/RegionReadExecutor.java | 23 ++++++-
.../fragment/FragmentInstanceManager.java | 79 ++++++++++++++++------
.../scheduler/FragmentInstanceDispatcherImpl.java | 15 ++++
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 2 +-
.../execution/executor/RegionReadExecutorTest.java | 46 +++++++++++++
6 files changed, 141 insertions(+), 25 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index e5014681fa7..035c648132f 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -146,6 +146,7 @@ public enum TSStatusCode {
QUERY_TIMEOUT(720),
PLAN_FAILED_NETWORK_PARTITION(721),
CANNOT_FETCH_FI_STATE(722),
+ REPEATED_RPC_CALL(723),
// OBJECT
OBJECT_NOT_EXISTS(740),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index b393919cf77..07c1574e2aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -127,6 +128,13 @@ public class RegionReadExecutor {
|| t instanceof InterruptedException) {
resp.setReadNeedRetry(true);
resp.setStatus(new
TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()));
+ } else if (t instanceof IoTDBRuntimeException) {
+ // Carry the original status code (e.g. REPEATED_RPC_CALL) back to the
dispatcher so it is
+ // not downgraded to EXECUTE_STATEMENT_ERROR; needRetryHelper decides
retryability.
+ TSStatus status = new TSStatus(((IoTDBRuntimeException)
t).getErrorCode());
+ status.setMessage(t.getMessage());
+ resp.setStatus(status);
+ resp.setReadNeedRetry(StatusUtils.needRetryHelper(status));
}
return resp;
}
@@ -156,8 +164,19 @@ public class RegionReadExecutor {
}
} catch (Throwable t) {
LOGGER.warn(DataNodeQueryMessages.EXECUTE_FRAGMENTINSTANCE_IN_QUERYEXECUTOR_FAILED,
t);
- return RegionExecutionResult.create(
- false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null);
+ RegionExecutionResult resp =
+ RegionExecutionResult.create(
+ false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null);
+ Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(t);
+ if (rootCause instanceof IoTDBRuntimeException) {
+ // Carry the original status code (e.g. REPEATED_RPC_CALL) back to the
dispatcher so it is
+ // not downgraded to EXECUTE_STATEMENT_ERROR; needRetryHelper decides
retryability.
+ TSStatus status = new TSStatus(((IoTDBRuntimeException)
rootCause).getErrorCode());
+ status.setMessage(rootCause.getMessage());
+ resp.setStatus(status);
+ resp.setReadNeedRetry(StatusUtils.needRetryHelper(status));
+ }
+ return resp;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 37e5c6c0858..e5f3cbc7e0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -147,23 +147,30 @@ public class FragmentInstanceManager {
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
- int dataNodeFINum = instance.getDataNodeFINum();
- DataNodeQueryContext dataNodeQueryContext =
- getOrCreateDataNodeQueryContext(instanceId.getQueryId(),
dataNodeFINum);
-
+ boolean[] contextCreated = new boolean[] {false};
+ DataNodeQueryContext[] dataNodeQueryContexts = new
DataNodeQueryContext[1];
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
- fragmentInstanceId ->
- createFragmentInstanceContext(
- fragmentInstanceId,
- stateMachine,
- instance.getSessionInfo(),
- dataRegion,
- instance.getGlobalTimePredicate(),
- dataNodeQueryContextMap,
- instance.isDebug(),
- instance.isVerbose()));
+ fragmentInstanceId -> {
+ contextCreated[0] = true;
+ // Only ensure the DataNodeQueryContext when we
actually create the
+ // FragmentInstanceContext, so the repeated-dispatch
path (which rejects
+ // without creating a context) does not leak a
context entry.
+ dataNodeQueryContexts[0] =
+ getOrCreateDataNodeQueryContext(
+ instanceId.getQueryId(),
instance.getDataNodeFINum());
+ return createFragmentInstanceContext(
+ fragmentInstanceId,
+ stateMachine,
+ instance.getSessionInfo(),
+ dataRegion,
+ instance.getGlobalTimePredicate(),
+ dataNodeQueryContextMap,
+ instance.isDebug(),
+ instance.isVerbose());
+ });
+ rejectIfRepeatedDispatch(contextCreated[0], instanceId);
context.setHighestPriority(instance.isHighestPriority());
try {
@@ -172,7 +179,7 @@ public class FragmentInstanceManager {
instance.getFragment().getPlanNodeTree(),
instance.getFragment().getTypeProvider(),
context,
- dataNodeQueryContext);
+ dataNodeQueryContexts[0]);
List<IDriver> drivers = new ArrayList<>();
driverFactories.forEach(factory ->
drivers.add(factory.createDriver()));
@@ -246,6 +253,30 @@ public class FragmentInstanceManager {
}
}
+ /**
+ * If {@code instanceContext.computeIfAbsent} returned an existing {@link
FragmentInstanceContext}
+ * for this {@code instanceId} (i.e. {@code contextCreated} is false), the
same FragmentInstance
+ * has been dispatched before (e.g. an RPC retry in {@code
+ * FragmentInstanceDispatcherImpl#dispatchRemote}). The previous execution
may have already
+ * released its resources (dataRegion == null), so reusing this cached
context would run a fresh
+ * driver against a released context and trigger an NPE. Reject the
duplicated dispatch with
+ * REPEATED_RPC_CALL instead of reusing it.
+ *
+ * <p>This must be called before the planning try block on purpose, so it
propagates up
+ * (RegionReadExecutor carries the status code) without touching the first
execution's cached
+ * resources.
+ */
+ private static void rejectIfRepeatedDispatch(
+ boolean contextCreated, FragmentInstanceId instanceId) {
+ if (!contextCreated) {
+ throw new IoTDBRuntimeException(
+ String.format(
+ "Repeated RPC call detected for FragmentInstance %s, reject the
duplicated dispatch.",
+ instanceId.getFullId()),
+ TSStatusCode.REPEATED_RPC_CALL.getStatusCode());
+ }
+ }
+
private void clearFIRelatedResources(FragmentInstanceId instanceId) {
// close and remove all the handles of the fragment instance
exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift());
@@ -270,16 +301,20 @@ public class FragmentInstanceManager {
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ boolean[] contextCreated = new boolean[] {false};
FragmentInstanceContext context =
instanceContext.computeIfAbsent(
instanceId,
- fragmentInstanceId ->
- createFragmentInstanceContext(
- fragmentInstanceId,
- stateMachine,
- instance.getSessionInfo(),
- instance.isDebug(),
- instance.isVerbose()));
+ fragmentInstanceId -> {
+ contextCreated[0] = true;
+ return createFragmentInstanceContext(
+ fragmentInstanceId,
+ stateMachine,
+ instance.getSessionInfo(),
+ instance.isDebug(),
+ instance.isVerbose());
+ });
+ rejectIfRepeatedDispatch(contextCreated[0], instanceId);
context.setHighestPriority(instance.isHighestPriority());
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index aa5b8350f87..4b3165a9ed5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
@@ -589,6 +590,20 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
"can't execute request on node {}, error msg is {}, and we try to
reconnect this node.",
endPoint,
ExceptionUtils.getRootCause(e).toString());
+ // If the query has already timed out, do not retry. Re-dispatching the
same FragmentInstance
+ // may cause it to be executed twice on the remote node (see the
REPEATED_RPC_CALL handling in
+ // FragmentInstanceManager), so we fail fast with a timeout status
instead.
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - queryContext.getStartTime() >=
queryContext.getTimeOut()) {
+ throw new FragmentInstanceDispatchException(
+ RpcUtils.getStatus(
+ TSStatusCode.QUERY_TIMEOUT,
+ String.format(
+
QueryTimeoutRuntimeException.QUERY_TIMEOUT_EXCEPTION_MESSAGE,
+ queryContext.getStartTime(),
+ queryContext.getStartTime() + queryContext.getTimeOut(),
+ currentTime)));
+ }
// we just retry once to clear stale connection for a restart node.
try {
dispatchRemoteHelper(instance, endPoint);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 533fa8ab640..103549a9cc7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -160,7 +160,7 @@ public class ErrorHandlingUtils {
Throwable t = e instanceof ExecutionException ? e.getCause() : e;
if (t instanceof QueryTimeoutRuntimeException) {
- return RpcUtils.getStatus(TSStatusCode.INTERNAL_REQUEST_TIME_OUT,
rootCause.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.QUERY_TIMEOUT,
rootCause.getMessage());
} else if (t instanceof ParseCancellationException) {
return RpcUtils.getStatus(
TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR +
rootCause.getMessage());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
index 016fe90b460..b0b5fb805a3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@ -30,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Test;
import org.mockito.Mockito;
@@ -161,6 +163,50 @@ public class RegionReadExecutorTest {
assertEquals(String.format(ERROR_MSG_FORMAT, "schema-exception"),
res.getMessage());
}
+ @Test
+ public void testRepeatedRpcCall() throws ConsensusException {
+ // A repeated RPC dispatch of the same FragmentInstance makes
FragmentInstanceManager throw
+ // IoTDBRuntimeException(REPEATED_RPC_CALL). RegionReadExecutor must carry
that status code back
+ // (instead of dropping it, which would downgrade it to
EXECUTE_STATEMENT_ERROR) and mark the
+ // result as non-retryable.
+ ConsensusGroupId dataRegionGroupId = new DataRegionId(1);
+ FragmentInstanceId fragmentInstanceId =
+ new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+ FragmentInstance fragmentInstance = Mockito.mock(FragmentInstance.class);
+ Mockito.when(fragmentInstance.getId()).thenReturn(fragmentInstanceId);
+
+ IConsensus dataRegionConsensus = Mockito.mock(IConsensus.class);
+ IConsensus schemaRegionConsensus = Mockito.mock(IConsensus.class);
+ FragmentInstanceManager fragmentInstanceManager =
Mockito.mock(FragmentInstanceManager.class);
+
+ RegionReadExecutor executor =
+ new RegionReadExecutor(dataRegionConsensus, schemaRegionConsensus,
fragmentInstanceManager);
+
+ // consensus read path (covers both data and schema region queries)
+ Mockito.when(dataRegionConsensus.read(dataRegionGroupId, fragmentInstance))
+ .thenThrow(
+ new IoTDBRuntimeException("repeated",
TSStatusCode.REPEATED_RPC_CALL.getStatusCode()));
+
+ RegionExecutionResult res = executor.execute(dataRegionGroupId,
fragmentInstance);
+
+ assertFalse(res.isAccepted());
+ assertEquals(TSStatusCode.REPEATED_RPC_CALL.getStatusCode(),
res.getStatus().getCode());
+ assertFalse(res.isReadNeedRetry());
+
+ // VirtualDataRegion path (FI executed directly through
FragmentInstanceManager)
+ Mockito.when(
+ fragmentInstanceManager.execDataQueryFragmentInstance(
+ fragmentInstance, VirtualDataRegion.getInstance()))
+ .thenThrow(
+ new IoTDBRuntimeException("repeated",
TSStatusCode.REPEATED_RPC_CALL.getStatusCode()));
+
+ res = executor.execute(fragmentInstance);
+
+ assertFalse(res.isAccepted());
+ assertEquals(TSStatusCode.REPEATED_RPC_CALL.getStatusCode(),
res.getStatus().getCode());
+ assertFalse(res.isReadNeedRetry());
+ }
+
@Test
public void testExceptionHappened() throws ConsensusException {