This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2b7b6cb9063 Load: Fixed multiple bugs (#17413)
2b7b6cb9063 is described below
commit 2b7b6cb9063f063d50d5eadf651ed83fff0b65ae
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 2 09:37:39 2026 +0800
Load: Fixed multiple bugs (#17413)
* fix
* load
* Update ClusterConfigTaskExecutor.java
* fix
---
.../treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 91 ++++++++++------------
.../analyze/load/LoadTsFileTableSchemaCache.java | 4 +-
.../config/executor/ClusterConfigTaskExecutor.java | 2 +
.../planner/plan/node/load/LoadTsFileNode.java | 2 +
.../plan/scheduler/load/LoadTsFileScheduler.java | 5 +-
.../db/storageengine/load/LoadTsFileManager.java | 4 +-
.../load/active/ActiveLoadPendingQueue.java | 4 +-
.../load/config/LoadTsFileConfigurator.java | 4 +-
.../load/memory/LoadTsFileMemoryManager.java | 3 +-
.../load/metrics/LoadTsFileCostMetricsSet.java | 2 +-
10 files changed, 60 insertions(+), 61 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
index de1acfa32c7..0e8ff890b82 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
@@ -565,59 +565,52 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualTreeModelAutoIT {
@Test
public void testTransferMods() {
- try {
- TestUtils.executeNonQueries(
- senderEnv,
- Arrays.asList(
- "create database root.sg_nonAligned",
- "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0
with datatype=boolean, encoding=RLE,compressor=snappy",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1
with datatype=int32, encoding=PLAIN,compressor=LZ4",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2
with datatype=int64,encoding=gorilla,compressor=uncompressed",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3
with datatype=float,encoding=chimp,compressor=gzip",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4
with datatype=double,encoding=ts_2diff,compressor=zstd",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5
with datatype=text,encoding=dictionary,compressor=lzma2",
- "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1,
s2,s3,s4,s0,s5)
values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条
device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5
[...]
- "flush",
- "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
- String.format(
- "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
-
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create database root.sg_nonAligned",
+ "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0
with datatype=boolean, encoding=RLE,compressor=snappy",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1
with datatype=int32, encoding=PLAIN,compressor=LZ4",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2
with datatype=int64,encoding=gorilla,compressor=uncompressed",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3
with datatype=float,encoding=chimp,compressor=gzip",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4
with datatype=double,encoding=ts_2diff,compressor=zstd",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5
with datatype=text,encoding=dictionary,compressor=lzma2",
+ "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1,
s2,s3,s4,s0,s5)
values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条
device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.3
[...]
+ "flush",
+ "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
+ String.format(
+ "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
- TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
- "count(timeseries),",
- Collections.singleton("5,"));
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+ "count(timeseries),",
+ Collections.singleton("5,"));
- TestUtils.executeNonQueries(
- senderEnv, Arrays.asList("drop pipe test_history", "drop pipe
test_realtime"));
+ TestUtils.executeNonQueries(
+ senderEnv, Arrays.asList("drop pipe test_history", "drop pipe
test_realtime"));
- TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
+ TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
- TestUtils.executeNonQueries(
- senderEnv,
- Arrays.asList(
- "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
- String.format(
- "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
-
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
+ String.format(
+ "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
- TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
- "count(timeseries),",
- Collections.singleton("4,"),
- 15);
- TestUtils.assertDataAlwaysOnEnv(
- receiverEnv,
- "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
- "count(timeseries),",
- Collections.singleton("4,"));
- } finally {
- TestUtils.executeNonQueries(
- senderEnv,
- Arrays.asList(
- "drop pipe test_history", "drop pipe test_realtime", "drop
database root.**"));
- }
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+ "count(timeseries),",
+ Collections.singleton("4,"),
+ 15);
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+ "count(timeseries),",
+ Collections.singleton("4,"));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index 79ede0f459e..6754468bad5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -387,10 +387,10 @@ public class LoadTsFileTableSchemaCache {
&& (realColumn == null ||
!fileColumn.getType().equals(realColumn.getType()))) {
LOGGER.debug(
"Data type mismatch for column {} in table {}, type in TsFile:
{}, type in IoTDB: {}",
- realColumn.getName(),
+ fileColumn.getName(),
realSchema.getTableName(),
fileColumn.getType(),
- realColumn.getType());
+ Objects.nonNull(realColumn) ? realColumn.getType() : null);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index b0ebee30ecb..cf404e8d57b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -3514,6 +3514,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(
new IOException(
"The DataNode to be removed is not in the cluster, or the
input format is incorrect."));
+ return future;
}
LOGGER.info("Starting to remove DataNode with nodeIds: {}", nodeIds);
@@ -3583,6 +3584,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
future.setException(
new IOException(
"The ConfigNode to be removed is not in the cluster, or the
input format is incorrect."));
+ return future;
}
TConfigNodeLocation configNodeLocation =
removeConfigNodeLocations.get(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index e70bf08fd52..ede0745193a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -152,6 +152,8 @@ public class LoadTsFileNode extends WritePlanNode {
statement.isDeleteAfterLoad(),
statement.getWritePointCount(i),
needDecode4TimeColumn));
+ } else {
+ throw new IllegalStateException("LoadTsFile statement is null during
table model split.");
}
}
return res;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 1730fa2d921..bc8446fd3da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -286,7 +286,10 @@ public class LoadTsFileScheduler implements IScheduler {
final StringBuilder failedTsFiles =
new StringBuilder(
!tsFileNodeList.isEmpty()
- ? tsFileNodeList.get(0).getTsFileResource().getTsFilePath()
+ ? tsFileNodeList
+ .get(failedTsFileNodeIndexes.get(0))
+ .getTsFileResource()
+ .getTsFilePath()
: "");
final ListIterator<Integer> iterator =
failedTsFileNodeIndexes.listIterator(1);
while (iterator.hasNext()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 582fcb8c49c..16c78811d08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -224,7 +224,7 @@ public class LoadTsFileManager {
}
}
- final Optional<CleanupTask> cleanupTask =
Optional.of(uuid2CleanupTask.get(uuid));
+ final Optional<CleanupTask> cleanupTask =
Optional.ofNullable(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
try {
final AtomicReference<Exception> exception = new AtomicReference<>();
@@ -304,7 +304,7 @@ public class LoadTsFileManager {
return false;
}
- final Optional<CleanupTask> cleanupTask =
Optional.of(uuid2CleanupTask.get(uuid));
+ final Optional<CleanupTask> cleanupTask =
Optional.ofNullable(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
try {
uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe,
timePartitionProgressIndexMap);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
index 7b5f7166197..88c18b19cb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java
@@ -70,11 +70,11 @@ public class ActiveLoadPendingQueue {
return loadingFileSet.contains(file) || pendingFileSet.contains(file);
}
- public int size() {
+ public synchronized int size() {
return pendingFileQueue.size() + loadingFileSet.size();
}
- public boolean isEmpty() {
+ public synchronized boolean isEmpty() {
return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 8478486781b..4f0156ac0d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -83,14 +83,14 @@ public class LoadTsFileConfigurator {
public static void validateDatabaseLevelParam(final String databaseLevel) {
try {
- int level = Integer.parseInt(databaseLevel);
+ final int level = Integer.parseInt(databaseLevel);
if (level < DATABASE_LEVEL_MIN_VALUE) {
throw new SemanticException(
String.format(
"Given database level %d is less than the minimum value %d,
please input a valid database level.",
level, DATABASE_LEVEL_MIN_VALUE));
}
- } catch (Exception e) {
+ } catch (final NumberFormatException e) {
throw new SemanticException(
String.format(
"Given database level %s is not a valid integer, please input a
valid database level.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index 6142d122714..e08a135d312 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -156,7 +156,7 @@ public class LoadTsFileMemoryManager {
try {
forceAllocateFromQuery(bytesNeeded);
if (LOGGER.isDebugEnabled()) {
- LOGGER.info(
+ LOGGER.debug(
"Load: Force resized LoadTsFileMemoryBlock with memory from query
engine, size added: {}, new size: {}",
bytesNeeded,
newSizeInBytes);
@@ -180,7 +180,6 @@ public class LoadTsFileMemoryManager {
final long actuallyAllocateMemoryInBytes =
tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
dataCacheMemoryBlock = new
LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
- usedMemorySizeInBytes.addAndGet(actuallyAllocateMemoryInBytes);
LOGGER.info(
"Create Data Cache Memory Block {}, allocate memory {}",
dataCacheMemoryBlock,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index 28bd40c2d29..1ce119131ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -159,7 +159,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet
{
stage));
metricService.remove(
- MetricType.RATE,
+ MetricType.COUNTER,
Metric.LOAD_DISK_IO.toString(),
Tag.NAME.toString(),
String.valueOf(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));