This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 958ce45b137 Fix repeated RPC dispatch reusing a released 
FragmentInstanceContext (NPE) (#17794)
958ce45b137 is described below

commit 958ce45b13722f04bbc3837ab76caad207e15216
Author: Jackie Tien <[email protected]>
AuthorDate: Mon Jun 1 10:39:38 2026 +0800

    Fix repeated RPC dispatch reusing a released FragmentInstanceContext (NPE) 
(#17794)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../execution/executor/RegionReadExecutor.java     | 23 ++++++-
 .../fragment/FragmentInstanceManager.java          | 79 ++++++++++++++++------
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 15 ++++
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |  2 +-
 .../execution/executor/RegionReadExecutorTest.java | 46 +++++++++++++
 6 files changed, 141 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index e5014681fa7..035c648132f 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -146,6 +146,7 @@ public enum TSStatusCode {
   QUERY_TIMEOUT(720),
   PLAN_FAILED_NETWORK_PARTITION(721),
   CANNOT_FETCH_FI_STATE(722),
+  REPEATED_RPC_CALL(723),
 
   // OBJECT
   OBJECT_NOT_EXISTS(740),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index b393919cf77..07c1574e2aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -127,6 +128,13 @@ public class RegionReadExecutor {
           || t instanceof InterruptedException) {
         resp.setReadNeedRetry(true);
         resp.setStatus(new 
TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()));
+      } else if (t instanceof IoTDBRuntimeException) {
+        // Carry the original status code (e.g. REPEATED_RPC_CALL) back to the 
dispatcher so it is
+        // not downgraded to EXECUTE_STATEMENT_ERROR; needRetryHelper decides 
retryability.
+        TSStatus status = new TSStatus(((IoTDBRuntimeException) 
t).getErrorCode());
+        status.setMessage(t.getMessage());
+        resp.setStatus(status);
+        resp.setReadNeedRetry(StatusUtils.needRetryHelper(status));
       }
       return resp;
     }
@@ -156,8 +164,19 @@ public class RegionReadExecutor {
       }
     } catch (Throwable t) {
       
LOGGER.warn(DataNodeQueryMessages.EXECUTE_FRAGMENTINSTANCE_IN_QUERYEXECUTOR_FAILED,
 t);
-      return RegionExecutionResult.create(
-          false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null);
+      RegionExecutionResult resp =
+          RegionExecutionResult.create(
+              false, String.format(ERROR_MSG_FORMAT, t.getMessage()), null);
+      Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(t);
+      if (rootCause instanceof IoTDBRuntimeException) {
+        // Carry the original status code (e.g. REPEATED_RPC_CALL) back to the 
dispatcher so it is
+        // not downgraded to EXECUTE_STATEMENT_ERROR; needRetryHelper decides 
retryability.
+        TSStatus status = new TSStatus(((IoTDBRuntimeException) 
rootCause).getErrorCode());
+        status.setMessage(rootCause.getMessage());
+        resp.setStatus(status);
+        resp.setReadNeedRetry(StatusUtils.needRetryHelper(status));
+      }
+      return resp;
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 37e5c6c0858..e5f3cbc7e0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -147,23 +147,30 @@ public class FragmentInstanceManager {
                 FragmentInstanceStateMachine stateMachine =
                     new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
 
-                int dataNodeFINum = instance.getDataNodeFINum();
-                DataNodeQueryContext dataNodeQueryContext =
-                    getOrCreateDataNodeQueryContext(instanceId.getQueryId(), 
dataNodeFINum);
-
+                boolean[] contextCreated = new boolean[] {false};
+                DataNodeQueryContext[] dataNodeQueryContexts = new 
DataNodeQueryContext[1];
                 FragmentInstanceContext context =
                     instanceContext.computeIfAbsent(
                         instanceId,
-                        fragmentInstanceId ->
-                            createFragmentInstanceContext(
-                                fragmentInstanceId,
-                                stateMachine,
-                                instance.getSessionInfo(),
-                                dataRegion,
-                                instance.getGlobalTimePredicate(),
-                                dataNodeQueryContextMap,
-                                instance.isDebug(),
-                                instance.isVerbose()));
+                        fragmentInstanceId -> {
+                          contextCreated[0] = true;
+                          // Only ensure the DataNodeQueryContext when we 
actually create the
+                          // FragmentInstanceContext, so the repeated-dispatch 
path (which rejects
+                          // without creating a context) does not leak a 
context entry.
+                          dataNodeQueryContexts[0] =
+                              getOrCreateDataNodeQueryContext(
+                                  instanceId.getQueryId(), 
instance.getDataNodeFINum());
+                          return createFragmentInstanceContext(
+                              fragmentInstanceId,
+                              stateMachine,
+                              instance.getSessionInfo(),
+                              dataRegion,
+                              instance.getGlobalTimePredicate(),
+                              dataNodeQueryContextMap,
+                              instance.isDebug(),
+                              instance.isVerbose());
+                        });
+                rejectIfRepeatedDispatch(contextCreated[0], instanceId);
                 context.setHighestPriority(instance.isHighestPriority());
 
                 try {
@@ -172,7 +179,7 @@ public class FragmentInstanceManager {
                           instance.getFragment().getPlanNodeTree(),
                           instance.getFragment().getTypeProvider(),
                           context,
-                          dataNodeQueryContext);
+                          dataNodeQueryContexts[0]);
 
                   List<IDriver> drivers = new ArrayList<>();
                   driverFactories.forEach(factory -> 
drivers.add(factory.createDriver()));
@@ -246,6 +253,30 @@ public class FragmentInstanceManager {
     }
   }
 
+  /**
+   * If {@code instanceContext.computeIfAbsent} returned an existing {@link 
FragmentInstanceContext}
+   * for this {@code instanceId} (i.e. {@code contextCreated} is false), the 
same FragmentInstance
+   * has been dispatched before (e.g. an RPC retry in {@code
+   * FragmentInstanceDispatcherImpl#dispatchRemote}). The previous execution 
may have already
+   * released its resources (dataRegion == null), so reusing this cached 
context would run a fresh
+   * driver against a released context and trigger an NPE. Reject the 
duplicated dispatch with
+   * REPEATED_RPC_CALL instead of reusing it.
+   *
+   * <p>This must be called before the planning try block on purpose, so it 
propagates up
+   * (RegionReadExecutor carries the status code) without touching the first 
execution's cached
+   * resources.
+   */
+  private static void rejectIfRepeatedDispatch(
+      boolean contextCreated, FragmentInstanceId instanceId) {
+    if (!contextCreated) {
+      throw new IoTDBRuntimeException(
+          String.format(
+              "Repeated RPC call detected for FragmentInstance %s, reject the 
duplicated dispatch.",
+              instanceId.getFullId()),
+          TSStatusCode.REPEATED_RPC_CALL.getStatusCode());
+    }
+  }
+
   private void clearFIRelatedResources(FragmentInstanceId instanceId) {
     // close and remove all the handles of the fragment instance
     exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift());
@@ -270,16 +301,20 @@ public class FragmentInstanceManager {
               FragmentInstanceStateMachine stateMachine =
                   new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
 
+              boolean[] contextCreated = new boolean[] {false};
               FragmentInstanceContext context =
                   instanceContext.computeIfAbsent(
                       instanceId,
-                      fragmentInstanceId ->
-                          createFragmentInstanceContext(
-                              fragmentInstanceId,
-                              stateMachine,
-                              instance.getSessionInfo(),
-                              instance.isDebug(),
-                              instance.isVerbose()));
+                      fragmentInstanceId -> {
+                        contextCreated[0] = true;
+                        return createFragmentInstanceContext(
+                            fragmentInstanceId,
+                            stateMachine,
+                            instance.getSessionInfo(),
+                            instance.isDebug(),
+                            instance.isVerbose());
+                      });
+              rejectIfRepeatedDispatch(contextCreated[0], instanceId);
               context.setHighestPriority(instance.isHighestPriority());
 
               try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index aa5b8350f87..4b3165a9ed5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
@@ -589,6 +590,20 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           "can't execute request on node {}, error msg is {}, and we try to 
reconnect this node.",
           endPoint,
           ExceptionUtils.getRootCause(e).toString());
+      // If the query has already timed out, do not retry. Re-dispatching the 
same FragmentInstance
+      // may cause it to be executed twice on the remote node (see the 
REPEATED_RPC_CALL handling in
+      // FragmentInstanceManager), so we fail fast with a timeout status 
instead.
+      long currentTime = System.currentTimeMillis();
+      if (currentTime - queryContext.getStartTime() >= 
queryContext.getTimeOut()) {
+        throw new FragmentInstanceDispatchException(
+            RpcUtils.getStatus(
+                TSStatusCode.QUERY_TIMEOUT,
+                String.format(
+                    
QueryTimeoutRuntimeException.QUERY_TIMEOUT_EXCEPTION_MESSAGE,
+                    queryContext.getStartTime(),
+                    queryContext.getStartTime() + queryContext.getTimeOut(),
+                    currentTime)));
+      }
       // we just retry once to clear stale connection for a restart node.
       try {
         dispatchRemoteHelper(instance, endPoint);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 533fa8ab640..103549a9cc7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -160,7 +160,7 @@ public class ErrorHandlingUtils {
 
     Throwable t = e instanceof ExecutionException ? e.getCause() : e;
     if (t instanceof QueryTimeoutRuntimeException) {
-      return RpcUtils.getStatus(TSStatusCode.INTERNAL_REQUEST_TIME_OUT, 
rootCause.getMessage());
+      return RpcUtils.getStatus(TSStatusCode.QUERY_TIMEOUT, 
rootCause.getMessage());
     } else if (t instanceof ParseCancellationException) {
       return RpcUtils.getStatus(
           TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + 
rootCause.getMessage());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
index 016fe90b460..b0b5fb805a3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@ -30,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -161,6 +163,50 @@ public class RegionReadExecutorTest {
     assertEquals(String.format(ERROR_MSG_FORMAT, "schema-exception"), 
res.getMessage());
   }
 
+  @Test
+  public void testRepeatedRpcCall() throws ConsensusException {
+    // A repeated RPC dispatch of the same FragmentInstance makes 
FragmentInstanceManager throw
+    // IoTDBRuntimeException(REPEATED_RPC_CALL). RegionReadExecutor must carry 
that status code back
+    // (instead of dropping it, which would downgrade it to 
EXECUTE_STATEMENT_ERROR) and mark the
+    // result as non-retryable.
+    ConsensusGroupId dataRegionGroupId = new DataRegionId(1);
+    FragmentInstanceId fragmentInstanceId =
+        new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
+    FragmentInstance fragmentInstance = Mockito.mock(FragmentInstance.class);
+    Mockito.when(fragmentInstance.getId()).thenReturn(fragmentInstanceId);
+
+    IConsensus dataRegionConsensus = Mockito.mock(IConsensus.class);
+    IConsensus schemaRegionConsensus = Mockito.mock(IConsensus.class);
+    FragmentInstanceManager fragmentInstanceManager = 
Mockito.mock(FragmentInstanceManager.class);
+
+    RegionReadExecutor executor =
+        new RegionReadExecutor(dataRegionConsensus, schemaRegionConsensus, 
fragmentInstanceManager);
+
+    // consensus read path (covers both data and schema region queries)
+    Mockito.when(dataRegionConsensus.read(dataRegionGroupId, fragmentInstance))
+        .thenThrow(
+            new IoTDBRuntimeException("repeated", 
TSStatusCode.REPEATED_RPC_CALL.getStatusCode()));
+
+    RegionExecutionResult res = executor.execute(dataRegionGroupId, 
fragmentInstance);
+
+    assertFalse(res.isAccepted());
+    assertEquals(TSStatusCode.REPEATED_RPC_CALL.getStatusCode(), 
res.getStatus().getCode());
+    assertFalse(res.isReadNeedRetry());
+
+    // VirtualDataRegion path (FI executed directly through 
FragmentInstanceManager)
+    Mockito.when(
+            fragmentInstanceManager.execDataQueryFragmentInstance(
+                fragmentInstance, VirtualDataRegion.getInstance()))
+        .thenThrow(
+            new IoTDBRuntimeException("repeated", 
TSStatusCode.REPEATED_RPC_CALL.getStatusCode()));
+
+    res = executor.execute(fragmentInstance);
+
+    assertFalse(res.isAccepted());
+    assertEquals(TSStatusCode.REPEATED_RPC_CALL.getStatusCode(), 
res.getStatus().getCode());
+    assertFalse(res.isReadNeedRetry());
+  }
+
   @Test
   public void testExceptionHappened() throws ConsensusException {
 

Reply via email to