This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ef872f3856d Fix concurrent modification of non-thread-safe data
structures caused by parallel dispatching
ef872f3856d is described below
commit ef872f3856d5c0286f4f18f8a95c01f6e9c637d6
Author: shuwenwei <[email protected]>
AuthorDate: Fri Aug 15 14:19:48 2025 +0800
Fix concurrent modification of non-thread-safe data structures caused by
parallel dispatching
---
.../apache/iotdb/db/queryengine/common/MPPQueryContext.java | 10 +++++-----
.../plan/scheduler/FragmentInstanceDispatcherImpl.java | 9 +++++++--
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 0035f726c4d..fc457ab9ce4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -34,9 +34,8 @@ import org.apache.tsfile.read.filter.basic.Filter;
import java.time.ZoneId;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongConsumer;
/**
@@ -66,7 +65,8 @@ public class MPPQueryContext {
// When some DataNode cannot be connected, its endPoint will be put
// in this list. And the following retry will avoid planning fragment
// onto this node.
- private final List<TEndPoint> endPointBlackList;
+ // When dispatch FI fails, this structure may be modified concurrently
+ private final Set<TEndPoint> endPointBlackList;
private final TypeProvider typeProvider = new TypeProvider();
@@ -97,7 +97,7 @@ public class MPPQueryContext {
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
- this.endPointBlackList = new LinkedList<>();
+ this.endPointBlackList = ConcurrentHashMap.newKeySet();
this.memoryReservationManager =
new NotThreadSafeMemoryReservationManager(queryId,
this.getClass().getName());
}
@@ -222,7 +222,7 @@ public class MPPQueryContext {
this.endPointBlackList.add(endPoint);
}
- public List<TEndPoint> getEndPointBlackList() {
+ public Set<TEndPoint> getEndPointBlackList() {
return endPointBlackList;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index fce62782abc..162ccf22a75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -146,12 +146,17 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
}
+ FragInstanceDispatchResult failedResult = null;
for (Future<FragInstanceDispatchResult> future : futures) {
+ // Make sure all executing tasks are finished to avoid concurrency
issues
FragInstanceDispatchResult result = future.get();
- if (!result.isSuccessful()) {
- return immediateFuture(result);
+ if (!result.isSuccessful() && failedResult == null) {
+ failedResult = result;
}
}
+ if (failedResult != null) {
+ return immediateFuture(failedResult);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted when dispatching read async", e);