This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch run-ci-mod
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/run-ci-mod by this push:
new 6e163e752dd may-catch-bug
6e163e752dd is described below
commit 6e163e752dd4ebe37553d9ce306f22c32d6d1d12
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 27 16:34:42 2026 +0800
may-catch-bug
---
.../treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 64 +++++++++++-----------
.../scan/TsFileInsertionEventScanParser.java | 5 +-
.../db/storageengine/dataregion/DataRegion.java | 5 +-
3 files changed, 38 insertions(+), 36 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
index d7794b12c7b..212eff5d879 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
@@ -579,43 +579,43 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualTreeModelAutoIT {
@Test
public void testTransferMods() {
- TestUtils.executeNonQueries(
- senderEnv,
- Arrays.asList(
- "create database root.sg_nonAligned",
- "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0
with datatype=boolean, encoding=RLE,compressor=snappy",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1
with datatype=int32, encoding=PLAIN,compressor=LZ4",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2
with datatype=int64,encoding=gorilla,compressor=uncompressed",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3
with datatype=float,encoding=chimp,compressor=gzip",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4
with datatype=double,encoding=ts_2diff,compressor=zstd",
- "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5
with datatype=text,encoding=dictionary,compressor=lzma2",
- "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1,
s2,s3,s4,s0,s5)
values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条
device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5.3
[...]
- "flush",
- "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
- String.format(
- "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
-
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+ try {
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create database root.sg_nonAligned",
+ "create TIMESERIES root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s0
with datatype=boolean, encoding=RLE,compressor=snappy",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s1
with datatype=int32, encoding=PLAIN,compressor=LZ4",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s2
with datatype=int64,encoding=gorilla,compressor=uncompressed",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s3
with datatype=float,encoding=chimp,compressor=gzip",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s4
with datatype=double,encoding=ts_2diff,compressor=zstd",
+ "create timeseries root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`.s5
with datatype=text,encoding=dictionary,compressor=lzma2",
+ "insert into root.sg_nonAligned.`非对齐序列带有encoding和压缩方式`(time,s1,
s2,s3,s4,s0,s5)
values(1,1,10,5.39,5.51234,true,''),(11,null,20,5.39,15.51234,false,'第2条
device_nonAligned'),(21,3,null,5.39,25.51234,true,'第3条device_nonAligned'),(31,4,40,null,35.51234,true,'第4条device_nonAligned'),(41,5,50,5.39,null,false,'第5条device_nonAligned'),(51,6,60,5.39,55.51234,null,'第6条device_nonAligned'),(61,7,70,5.39,65.51234,false,null),(71,8,80,5.39,75.51234,false,'第8条device_nonAligned'),(81,9,90,5
[...]
+ "flush",
+ "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s0",
+ String.format(
+ "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
- TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
- "count(timeseries),",
- Collections.singleton("5,"));
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
+ "count(timeseries),",
+ Collections.singleton("5,"));
- TestUtils.executeNonQueries(
- senderEnv, Arrays.asList("drop pipe test_history", "drop pipe
test_realtime"));
+ TestUtils.executeNonQueries(
+ senderEnv, Arrays.asList("drop pipe test_history", "drop pipe
test_realtime"));
- TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
+ TestUtils.executeNonQuery(receiverEnv, "drop database root.**");
- TestUtils.executeNonQueries(
- senderEnv,
- Arrays.asList(
- "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
- String.format(
- "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
-
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "delete timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.s1",
+ String.format(
+ "create pipe test with source
('source.realtime.mode'='stream','inclusion'='data','path'='root.sg_nonAligned.非对齐序列带有encoding和压缩方式.**','source.realtime.enable'='true','mods.enable'='true')
with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())));
- try {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"count timeseries root.sg_nonAligned.非对齐序列带有encoding和压缩方式.*",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index b428e9b8c0a..da0be745458 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -687,10 +687,11 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
return true;
} else if (chunkHeader.getMeasurementID().equals("s1")) {
LOGGER.warn(
- "s1 not filtered, currentModifications: {}, modFile: {}",
+ "s1 not filtered, currentModifications: {}, modFile: {}, file
path: {}",
currentModifications.getDeviceOverlapped(new
PartialPath("root.**")),
new ModificationFile(((PipeTsFileInsertionEvent)
sourceEvent).getModFile(), false)
- .getAllMods());
+ .getAllMods(),
+ tsFileSequenceReader.getFileName());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index eb37cb1d5d2..db551407985 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3433,9 +3433,10 @@ public class DataRegion implements IDataRegionForQuery {
}
}
logger.info(
- "[Deletion] Deletion {} is written into {} mod files",
+ "[Deletion] Deletion {} is written into {} mod files, file path: {}",
deletion,
- involvedModificationFiles.size());
+ involvedModificationFiles.size(),
+ involvedModificationFiles);
}
private boolean isFileFullyMatchedByTime(