This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d55d5121df Optimize load partition routing (#17863)
4d55d5121df is described below

commit 4d55d5121df3ffab6fef8931daae026b2b046e89
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 12:03:12 2026 +0800

    Optimize load partition routing (#17863)
---
 .../plan/analyze/ClusterPartitionFetcher.java      | 38 +++++-----
 .../plan/node/load/LoadSingleTsFileNode.java       | 36 +++++----
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 88 +++++++++++++---------
 3 files changed, 90 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index 2ccbf0522e8..cba3f22773f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -273,28 +273,28 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
   @Override
   public DataPartition getOrCreateDataPartition(
       final List<DataPartitionQueryParam> dataPartitionQueryParams, final 
String userName) {
-    DataPartition dataPartition;
+    final Map<String, List<DataPartitionQueryParam>> 
splitDataPartitionQueryParams =
+        splitDataPartitionQueryParam(
+            dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), 
userName);
+    DataPartition dataPartition = 
partitionCache.getDataPartition(splitDataPartitionQueryParams);
+    if (null != dataPartition) {
+      return dataPartition;
+    }
+
     try (final ConfigNodeClient client =
         configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
-      final Map<String, List<DataPartitionQueryParam>> 
splitDataPartitionQueryParams =
-          splitDataPartitionQueryParam(
-              dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), 
userName);
-      dataPartition = 
partitionCache.getDataPartition(splitDataPartitionQueryParams);
-
-      if (null == dataPartition) {
-        final TDataPartitionReq req = 
constructDataPartitionReq(splitDataPartitionQueryParams);
-        final TDataPartitionTableResp dataPartitionTableResp =
-            client.getOrCreateDataPartitionTable(req);
+      final TDataPartitionReq req = 
constructDataPartitionReq(splitDataPartitionQueryParams);
+      final TDataPartitionTableResp dataPartitionTableResp =
+          client.getOrCreateDataPartitionTable(req);
 
-        if (dataPartitionTableResp.getStatus().getCode()
-            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          dataPartition = parseDataPartitionResp(dataPartitionTableResp);
-          
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
-        } else {
-          throw new IoTDBRuntimeException(
-              dataPartitionTableResp.getStatus().getMessage(),
-              dataPartitionTableResp.getStatus().getCode());
-        }
+      if (dataPartitionTableResp.getStatus().getCode()
+          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        dataPartition = parseDataPartitionResp(dataPartitionTableResp);
+        
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
+      } else {
+        throw new IoTDBRuntimeException(
+            dataPartitionTableResp.getStatus().getMessage(),
+            dataPartitionTableResp.getStatus().getCode());
       }
     } catch (final ClientManagerException | TException e) {
       throw new StatementAnalyzeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 098b9fde361..be29fabe367 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -96,27 +96,31 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       return true;
     }
 
-    List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
-    resource
-        .getDevices()
-        .forEach(
-            o -> {
-              // iterating the index, must present
-              slotList.add(
-                  new Pair<>(
-                      o, 
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
-              slotList.add(
-                  new Pair<>(
-                      o, 
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
-            });
+    List<Pair<IDeviceID, TTimePartitionSlot>> slotList =
+        new ArrayList<>(resource.getDevices().size() << 1);
+    for (final IDeviceID device : resource.getDevices()) {
+      // iterating the index, must present
+      final TTimePartitionSlot startSlot =
+          
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
+      final TTimePartitionSlot endSlot =
+          
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
+      slotList.add(new Pair<>(device, startSlot));
+      if (!startSlot.equals(endSlot)) {
+        slotList.add(new Pair<>(device, endSlot));
+      }
+    }
 
     if (slotList.isEmpty()) {
       throw new IllegalStateException(
           String.format("Devices in TsFile %s is empty, this should not happen 
here.", tsFile));
-    } else if (slotList.stream()
-        .anyMatch(slotPair -> 
!slotPair.getRight().equals(slotList.get(0).right))) {
-      needDecodeTsFile = true;
     } else {
+      final TTimePartitionSlot firstSlot = slotList.get(0).right;
+      for (int i = 1, size = slotList.size(); i < size; i++) {
+        if (!slotList.get(i).right.equals(firstSlot)) {
+          needDecodeTsFile = true;
+          return true;
+        }
+      }
       needDecodeTsFile = !isDispatchedToLocal(new 
HashSet<>(partitionFetcher.apply(slotList)));
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 300b8f0eece..6ca3164a3e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -782,14 +782,29 @@ public class LoadTsFileScheduler implements IScheduler {
         return;
       }
 
+      final List<Pair<IDeviceID, TTimePartitionSlot>> partitionSlotList = new 
ArrayList<>();
+      final int[] chunkPartitionIndexes = new 
int[nonDirectionalChunkData.size()];
+      final Map<IDeviceID, Map<TTimePartitionSlot, Integer>> 
partitionSlotIndexes = new HashMap<>();
+      for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+        final ChunkData chunkData = nonDirectionalChunkData.get(i);
+        final IDeviceID device = chunkData.getDevice();
+        final TTimePartitionSlot timePartitionSlot = 
chunkData.getTimePartitionSlot();
+        final Map<TTimePartitionSlot, Integer> slotIndexes =
+            partitionSlotIndexes.computeIfAbsent(device, key -> new 
HashMap<>());
+        Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
+        if (partitionSlotIndex == null) {
+          partitionSlotIndex = partitionSlotList.size();
+          slotIndexes.put(timePartitionSlot, partitionSlotIndex);
+          partitionSlotList.add(new Pair<>(device, timePartitionSlot));
+        }
+        chunkPartitionIndexes[i] = partitionSlotIndex;
+      }
+
       List<TRegionReplicaSet> replicaSets =
           scheduler.partitionFetcher.queryDataPartition(
-              nonDirectionalChunkData.stream()
-                  .map(data -> new Pair<>(data.getDevice(), 
data.getTimePartitionSlot()))
-                  .collect(Collectors.toList()),
-              scheduler.queryContext.getSession().getUserName());
-      for (int i = 0; i < replicaSets.size(); i++) {
-        final TRegionReplicaSet replicaSet = replicaSets.get(i);
+              partitionSlotList, 
scheduler.queryContext.getSession().getUserName());
+      for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
+        final TRegionReplicaSet replicaSet = 
replicaSets.get(chunkPartitionIndexes[i]);
         final TConsensusGroupId regionId = replicaSet.getRegionId();
         if (regionId2ReplicaSetAndNode.containsKey(regionId)
             && 
!Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(), 
replicaSet)) {
@@ -864,7 +879,7 @@ public class LoadTsFileScheduler implements IScheduler {
 
     public List<TRegionReplicaSet> queryDataPartition(
         List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
-      List<TRegionReplicaSet> replicaSets = new ArrayList<>();
+      List<TRegionReplicaSet> replicaSets = new ArrayList<>(slotList.size());
       int size = slotList.size();
 
       for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
@@ -872,42 +887,41 @@ public class LoadTsFileScheduler implements IScheduler {
             slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
         DataPartition dataPartition =
             fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), 
userName);
-        replicaSets.addAll(
-            subSlotList.stream()
-                .map(
-                    pair ->
-                        // database is an explicit database hint for 
table-model loads and
-                        // pipe-generated tree-model loads.
-                        database != null
-                            ? dataPartition.getDataRegionReplicaSetForWriting(
-                                pair.left, pair.right, database)
-                            : dataPartition.getDataRegionReplicaSetForWriting(
-                                pair.left, pair.right))
-                .collect(Collectors.toList()));
+        for (final Pair<IDeviceID, TTimePartitionSlot> pair : subSlotList) {
+          // database is an explicit database hint for table-model loads and
+          // pipe-generated tree-model loads.
+          replicaSets.add(
+              database != null
+                  ? dataPartition.getDataRegionReplicaSetForWriting(pair.left, 
pair.right, database)
+                  : dataPartition.getDataRegionReplicaSetForWriting(pair.left, 
pair.right));
+        }
       }
       return replicaSets;
     }
 
     private List<DataPartitionQueryParam> toQueryParam(
         List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
-      return slots.stream()
-          .collect(
-              Collectors.groupingBy(
-                  Pair::getLeft, Collectors.mapping(Pair::getRight, 
Collectors.toSet())))
-          .entrySet()
-          .stream()
-          .map(
-              entry -> {
-                DataPartitionQueryParam queryParam =
-                    new DataPartitionQueryParam(entry.getKey(), new 
ArrayList<>(entry.getValue()));
-                // database is an explicit database hint for table-model loads 
and
-                // pipe-generated tree-model loads.
-                if (database != null) {
-                  queryParam.setDatabaseName(database);
-                }
-                return queryParam;
-              })
-          .collect(Collectors.toList());
+      final Map<IDeviceID, Set<TTimePartitionSlot>> device2TimePartitionSlots 
= new HashMap<>();
+      for (final Pair<IDeviceID, TTimePartitionSlot> slot : slots) {
+        device2TimePartitionSlots
+            .computeIfAbsent(slot.left, key -> new HashSet<>())
+            .add(slot.right);
+      }
+
+      final List<DataPartitionQueryParam> queryParams =
+          new ArrayList<>(device2TimePartitionSlots.size());
+      for (final Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
+          device2TimePartitionSlots.entrySet()) {
+        final DataPartitionQueryParam queryParam =
+            new DataPartitionQueryParam(entry.getKey(), new 
ArrayList<>(entry.getValue()));
+        // database is an explicit database hint for table-model loads and
+        // pipe-generated tree-model loads.
+        if (database != null) {
+          queryParam.setDatabaseName(database);
+        }
+        queryParams.add(queryParam);
+      }
+      return queryParams;
     }
   }
 }

Reply via email to