This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch 
fixConcurrentModificationCausedByParallelDispatch-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2df09f68cca1fcf58c6e488dffb05992ce2ad34c
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 29 16:59:15 2025 +0800

    Fix concurrent modification of non-thread-safe data structures caused by 
parallel dispatching
---
 .../apache/iotdb/db/queryengine/common/MPPQueryContext.java    | 10 +++++-----
 .../plan/scheduler/FragmentInstanceDispatcherImpl.java         |  9 +++++++--
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 63970762de7..84f7c7cd26f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -34,9 +34,8 @@ import org.apache.tsfile.read.filter.basic.Filter;
 
 import java.time.ZoneId;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class is used to record the context of a query including QueryId, 
query statement, session
@@ -65,7 +64,8 @@ public class MPPQueryContext {
   // When some DataNode cannot be connected, its endPoint will be put
   // in this list. And the following retry will avoid planning fragment
   // onto this node.
-  private final List<TEndPoint> endPointBlackList;
+  // When dispatch FI fails, this structure may be modified concurrently
+  private final Set<TEndPoint> endPointBlackList;
 
   private final TypeProvider typeProvider = new TypeProvider();
 
@@ -92,7 +92,7 @@ public class MPPQueryContext {
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
-    this.endPointBlackList = new LinkedList<>();
+    this.endPointBlackList = ConcurrentHashMap.newKeySet();
     this.memoryReservationManager =
         new NotThreadSafeMemoryReservationManager(queryId, 
this.getClass().getName());
   }
@@ -189,7 +189,7 @@ public class MPPQueryContext {
     this.endPointBlackList.add(endPoint);
   }
 
-  public List<TEndPoint> getEndPointBlackList() {
+  public Set<TEndPoint> getEndPointBlackList() {
     return endPointBlackList;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index fce62782abc..162ccf22a75 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -146,12 +146,17 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
             
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
         futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
       }
+      FragInstanceDispatchResult failedResult = null;
       for (Future<FragInstanceDispatchResult> future : futures) {
+        // Make sure all executing tasks are finished to avoid concurrency 
issues
         FragInstanceDispatchResult result = future.get();
-        if (!result.isSuccessful()) {
-          return immediateFuture(result);
+        if (!result.isSuccessful() && failedResult == null) {
+          failedResult = result;
         }
       }
+      if (failedResult != null) {
+        return immediateFuture(failedResult);
+      }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.warn("Interrupted when dispatching read async", e);

Reply via email to