This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b7b6cb9063 Load: Fixed multiple bugs (#17413)
2b7b6cb9063 is described below

commit 2b7b6cb9063f063d50d5eadf651ed83fff0b65ae
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 09:37:39 2026 +0800

    Load: Fixed multiple bugs (#17413)
    
    * fix
    
    * load
    
    * Update ClusterConfigTaskExecutor.java
    
    * fix
---
 .../treemodel/auto/basic/IoTDBPipeDataSinkIT.java  | 91 ++++++++++------------
 .../analyze/load/LoadTsFileTableSchemaCache.java   |  4 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  2 +
 .../planner/plan/node/load/LoadTsFileNode.java     |  2 +
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  5 +-
 .../db/storageengine/load/LoadTsFileManager.java   |  4 +-
 .../load/active/ActiveLoadPendingQueue.java        |  4 +-
 .../load/config/LoadTsFileConfigurator.java        |  4 +-
 .../load/memory/LoadTsFileMemoryManager.java       |  3 +-
 .../load/metrics/LoadTsFileCostMetricsSet.java     |  2 +-
 10 files changed, 60 insertions(+), 61 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
index de1acfa32c7..0e8ff890b82 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
@@ -565,59 +565,52 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
 
   @Test
   public void testTransferMods() {
-    try {
-      TestUtils.executeNonQueries(
-          senderEnv,
-          Arrays.asList(
-              "create database root.sg_nonAligned",
-              "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 
with datatype=boolean, encoding=RLE,compressor=snappy",
-              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 
with datatype=int32, encoding=PLAIN,compressor=LZ4",
-              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 
with datatype=int64,encoding=gorilla,compressor=uncompressed",
-              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 
with datatype=float,encoding=chimp,compressor=gzip",
-              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 
with datatype=double,encoding=ts_2diff,compressor=zstd",
-              "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 
with datatype=text,encoding=dictionary,compressor=lzma2",
-              "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, 
s2,s3,s4,s0,s5) 
values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 
device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5
 [...]
-              "flush",
-              "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
-              String.format(
-                  "create pipe test with source 
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
 with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
-                  
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "create database root.sg_nonAligned",
+            "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0 
with datatype=boolean, encoding=RLE,compressor=snappy",
+            "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1 
with datatype=int32, encoding=PLAIN,compressor=LZ4",
+            "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2 
with datatype=int64,encoding=gorilla,compressor=uncompressed",
+            "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3 
with datatype=float,encoding=chimp,compressor=gzip",
+            "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4 
with datatype=double,encoding=ts_2diff,compressor=zstd",
+            "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5 
with datatype=text,encoding=dictionary,compressor=lzma2",
+            "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1, 
s2,s3,s4,s0,s5) 
values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条 
device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.3
 [...]
+            "flush",
+            "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
+            String.format(
+                "create pipe test with source 
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
 with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+                
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
 
-      TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
-          "count(timeseries),",
-          Collections.singleton("5,"));
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+        "count(timeseries),",
+        Collections.singleton("5,"));
 
-      TestUtils.executeNonQueries(
-          senderEnv, Arrays.asList("drop pipe test_history", "drop pipe 
test_realtime"));
+    TestUtils.executeNonQueries(
+        senderEnv, Arrays.asList("drop pipe test_history", "drop pipe 
test_realtime"));
 
-      TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
+    TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
 
-      TestUtils.executeNonQueries(
-          senderEnv,
-          Arrays.asList(
-              "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
-              String.format(
-                  "create pipe test with source 
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
 with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
-                  
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+    TestUtils.executeNonQueries(
+        senderEnv,
+        Arrays.asList(
+            "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
+            String.format(
+                "create pipe test with source 
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
 with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+                
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
 
-      TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
-          "count(timeseries),",
-          Collections.singleton("4,"),
-          15);
-      TestUtils.assertDataAlwaysOnEnv(
-          receiverEnv,
-          "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
-          "count(timeseries),",
-          Collections.singleton("4,"));
-    } finally {
-      TestUtils.executeNonQueries(
-          senderEnv,
-          Arrays.asList(
-              "drop pipe test_history", "drop pipe test_realtime", "drop 
database root.**"));
-    }
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+        "count(timeseries),",
+        Collections.singleton("4,"),
+        15);
+    TestUtils.assertDataAlwaysOnEnv(
+        receiverEnv,
+        "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+        "count(timeseries),",
+        Collections.singleton("4,"));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index 79ede0f459e..6754468bad5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -387,10 +387,10 @@ public class LoadTsFileTableSchemaCache {
             && (realColumn == null || 
!fileColumn.getType().equals(realColumn.getType()))) {
           LOGGER.debug(
               "Data type mismatch for column {} in table {}, type in TsFile: 
{}, type in IoTDB: {}",
-              realColumn.getName(),
+              fileColumn.getName(),
               realSchema.getTableName(),
               fileColumn.getType(),
-              realColumn.getType());
+              Objects.nonNull(realColumn) ? realColumn.getType() : null);
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b0ebee30ecb..cf404e8d57b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -3514,6 +3514,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         future.setException(
             new IOException(
                 "The DataNode to be removed is not in the cluster, or the 
input format is incorrect."));
+        return future;
       }
 
       LOGGER.info("Starting to remove DataNode with nodeIds: {}", nodeIds);
@@ -3583,6 +3584,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         future.setException(
             new IOException(
                 "The ConfigNode to be removed is not in the cluster, or the 
input format is incorrect."));
+        return future;
       }
 
       TConfigNodeLocation configNodeLocation = 
removeConfigNodeLocations.get(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index e70bf08fd52..ede0745193a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -152,6 +152,8 @@ public class LoadTsFileNode extends WritePlanNode {
                 statement.isDeleteAfterLoad(),
                 statement.getWritePointCount(i),
                 needDecode4TimeColumn));
+      } else {
+        throw new IllegalStateException("LoadTsFile statement is null during 
table model split.");
       }
     }
     return res;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 1730fa2d921..bc8446fd3da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -286,7 +286,10 @@ public class LoadTsFileScheduler implements IScheduler {
         final StringBuilder failedTsFiles =
             new StringBuilder(
                 !tsFileNodeList.isEmpty()
-                    ? tsFileNodeList.get(0).getTsFileResource().getTsFilePath()
+                    ? tsFileNodeList
+                        .get(failedTsFileNodeIndexes.get(0))
+                        .getTsFileResource()
+                        .getTsFilePath()
                     : "");
         final ListIterator<Integer> iterator = 
failedTsFileNodeIndexes.listIterator(1);
         while (iterator.hasNext()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 582fcb8c49c..16c78811d08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -224,7 +224,7 @@ public class LoadTsFileManager {
       }
     }
 
-    final Optional<CleanupTask> cleanupTask = 
Optional.of(uuid2CleanupTask.get(uuid));
+    final Optional<CleanupTask> cleanupTask = 
Optional.ofNullable(uuid2CleanupTask.get(uuid));
     cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
     try {
       final AtomicReference<Exception> exception = new AtomicReference<>();
@@ -304,7 +304,7 @@ public class LoadTsFileManager {
       return false;
     }
 
-    final Optional<CleanupTask> cleanupTask = 
Optional.of(uuid2CleanupTask.get(uuid));
+    final Optional<CleanupTask> cleanupTask = 
Optional.ofNullable(uuid2CleanupTask.get(uuid));
     cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
     try {
       uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, 
timePartitionProgressIndexMap);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 7b5f7166197..88c18b19cb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -70,11 +70,11 @@ public class ActiveLoadPendingQueue {
     return loadingFileSet.contains(file) || pendingFileSet.contains(file);
   }
 
-  public int size() {
+  public synchronized int size() {
     return pendingFileQueue.size() + loadingFileSet.size();
   }
 
-  public boolean isEmpty() {
+  public synchronized boolean isEmpty() {
     return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 8478486781b..4f0156ac0d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -83,14 +83,14 @@ public class LoadTsFileConfigurator {
 
   public static void validateDatabaseLevelParam(final String databaseLevel) {
     try {
-      int level = Integer.parseInt(databaseLevel);
+      final int level = Integer.parseInt(databaseLevel);
       if (level < DATABASE_LEVEL_MIN_VALUE) {
         throw new SemanticException(
             String.format(
                 "Given database level %d is less than the minimum value %d, 
please input a valid database level.",
                 level, DATABASE_LEVEL_MIN_VALUE));
       }
-    } catch (Exception e) {
+    } catch (final NumberFormatException e) {
       throw new SemanticException(
           String.format(
               "Given database level %s is not a valid integer, please input a 
valid database level.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index 6142d122714..e08a135d312 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -156,7 +156,7 @@ public class LoadTsFileMemoryManager {
     try {
       forceAllocateFromQuery(bytesNeeded);
       if (LOGGER.isDebugEnabled()) {
-        LOGGER.info(
+        LOGGER.debug(
             "Load: Force resized LoadTsFileMemoryBlock with memory from query 
engine, size added: {}, new size: {}",
             bytesNeeded,
             newSizeInBytes);
@@ -180,7 +180,6 @@ public class LoadTsFileMemoryManager {
       final long actuallyAllocateMemoryInBytes =
           tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
       dataCacheMemoryBlock = new 
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
-      usedMemorySizeInBytes.addAndGet(actuallyAllocateMemoryInBytes);
       LOGGER.info(
           "Create Data Cache Memory Block {}, allocate memory {}",
           dataCacheMemoryBlock,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index 28bd40c2d29..1ce119131ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -159,7 +159,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet 
{
                     stage));
 
     metricService.remove(
-        MetricType.RATE,
+        MetricType.COUNTER,
         Metric.LOAD_DISK_IO.toString(),
         Tag.NAME.toString(),
         
String.valueOf(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));

Reply via email to