This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a03b627a45c Fix: Add Mods data scanning check in scan parser for
non-aligned data (#17162)
a03b627a45c is described below
commit a03b627a45cdcce3df195800dc2f93cbbc21bbe5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Feb 5 09:34:41 2026 +0800
Fix: Add Mods data scanning check in scan parser for non-aligned data
(#17162)
* Fix: Add Mods data scanning check in scan parser for non-aligned data
This commit fixes the issue where the scan parser was not checking
Mods (modifications) data when processing non-aligned time series data.
Previously, only aligned data (VECTOR type) was checked for deletions
via Mods, but non-aligned single measurement data was missing this check.
The fix adds a Mods deletion check in the putValueToColumns method
for non-aligned data paths, ensuring that deleted data points are
properly filtered out during scan operations. This maintains consistency
with the aligned data handling and prevents deleted data from being
processed.
* add IT
---
.../IoTDBPipeTsFileDecompositionWithModsIT.java | 316 +++++++++++++++++++++
.../scan/TsFileInsertionEventScanParser.java | 5 +
2 files changed, 321 insertions(+)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
index 89f1c3fb440..87571e1eb71 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -657,4 +657,320 @@ public class IoTDBPipeTsFileDecompositionWithModsIT
extends AbstractPipeDualTree
"COUNT(root.sg1.d1.s5),",
Collections.singleton("3591,"));
}
+
+ /**
+ * Test IoTDB pipe handling TsFile decomposition with Mods (modification
operations) in tree model
+ * - Large scale single point deletion scenario (non-aligned timeseries)
+ *
+ * <p>Test scenario: 1. Create database root.sg1 with 1 device: d1
(non-aligned timeseries with 5
+ * sensors) 2. Insert 20000 data points for each sensor with different time
ranges: - s1: time
+ * 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time
30001-50000 - s5: time
+ * 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4.
Execute 2000 single point
+ * DELETE operations, each deleting one time point from different sensors 5.
Execute FLUSH
+ * operation again 6. Create pipe with mods enabled 7. Verify correctness of
receiver data: - Each
+ * sensor should have 19800 remaining data points - Deleted points should
not appear in receiver
+ *
+ * <p>Test purpose: Verify that IoTDB pipe can correctly handle large scale
single point deletion
+ * operations in TsFile under tree model with non-aligned timeseries,
ensuring the binary search
+ * optimization in ModsOperationUtil works correctly with many modification
entries.
+ */
+ @Test
+ public void
testTsFileDecompositionWithModsLargeScaleSinglePointDeletionNonAligned()
+ throws Exception {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE
root.sg1");
+
+ // Insert 20000 data points for s1 (time 1-20000)
+ String s1 = "INSERT INTO root.sg1.d1(time, s1) VALUES ";
+ StringBuilder insertBuilder1 = new StringBuilder(s1);
+ for (int i = 1; i <= 20000; i++) {
+
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
+ if (i % 50 != 0) {
+ insertBuilder1.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder1.toString());
+ insertBuilder1 = new StringBuilder(s1);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder1.length() > s1.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
+ }
+
+ // Insert 20000 data points for s2 (time 10001-30000)
+ String s2 = "INSERT INTO root.sg1.d1(time, s2) VALUES ";
+ StringBuilder insertBuilder2 = new StringBuilder(s2);
+ for (int i = 10001; i <= 30000; i++) {
+
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
+ if (i % 50 != 0) {
+ insertBuilder2.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder2.toString());
+ insertBuilder2 = new StringBuilder(s2);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder2.length() > s2.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
+ }
+
+ // Insert 20000 data points for s3 (time 20001-40000)
+ String s3 = "INSERT INTO root.sg1.d1(time, s3) VALUES ";
+ StringBuilder insertBuilder3 = new StringBuilder(s3);
+ for (int i = 20001; i <= 40000; i++) {
+
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
+ if (i % 50 != 0) {
+ insertBuilder3.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder3.toString());
+ insertBuilder3 = new StringBuilder(s3);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder3.length() > s3.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
+ }
+
+ // Insert 20000 data points for s4 (time 30001-50000)
+ String s4 = "INSERT INTO root.sg1.d1(time, s4) VALUES ";
+ StringBuilder insertBuilder4 = new StringBuilder(s4);
+ for (int i = 30001; i <= 50000; i++) {
+
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
+ if (i % 50 != 0) {
+ insertBuilder4.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder4.toString());
+ insertBuilder4 = new StringBuilder(s4);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder4.length() > s4.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
+ }
+
+ // Insert 20000 data points for s5 (time 40001-60000)
+ String s5 = "INSERT INTO root.sg1.d1(time, s5) VALUES ";
+ StringBuilder insertBuilder5 = new StringBuilder(s5);
+ for (int i = 40001; i <= 60000; i++) {
+
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
+ if (i % 50 != 0) {
+ insertBuilder5.append(",");
+ } else {
+ TestUtils.executeNonQueryWithRetry(senderEnv,
insertBuilder5.toString());
+ insertBuilder5 = new StringBuilder(s5);
+ }
+ }
+ // Execute remaining data if any
+ if (insertBuilder5.length() > s5.length()) {
+ TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Execute 2000 single point DELETE operations
+ // Delete 400 points from each sensor (distributed across different time
ranges)
+ for (int i = 0; i < 400; i++) {
+ // Delete from s1: every 10th point starting from 10
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i *
10));
+
+ // Delete from s2: every 10th point starting from 10010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i *
10));
+
+ // Delete from s3: every 10th point starting from 20010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i *
10));
+
+ // Delete from s4: every 10th point starting from 30010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i *
10));
+
+ // Delete from s5: every 10th point starting from 40010
+ TestUtils.executeNonQueryWithRetry(
+ senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i *
10));
+ }
+
+ TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+ // Verify sender data integrity before creating pipe to avoid leader
election issues
+ // This ensures all data is properly persisted and consistent on sender
side
+ // before starting the pipe synchronization process
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT COUNT(**) FROM root.sg1.d1",
+
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,19600,19600,19600,19600,"));
+
+ executeNonQueryWithRetry(
+ senderEnv,
+ String.format(
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+ receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+ receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+ // Verify total count of all sensors using COUNT(*)
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(**) FROM root.sg1.d1",
+
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,19600,19600,19600,19600,"));
+
+ // Verify individual sensor counts
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("19600,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("19600,"));
+
+ // Verify count of deleted time ranges using COUNT with WHERE clause
+ // These should return 0 since all points in these ranges were deleted
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000
AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <=
14000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <=
24000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <=
34000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("0,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <=
44000 AND time % 10 = 0",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("0,"));
+
+ // Verify count of non-deleted time ranges using multiple range queries
+ // Check ranges before deletion area
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time <
10010",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time <
20010",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time <
30010",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("9,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time <
40010",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("9,"));
+
+ // Check ranges after deletion area
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <=
20000",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <=
30000",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <=
40000",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <=
50000",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("16000,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <=
60000",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("16000,"));
+
+ // Check non-deleted points within deletion range (every 10th point except
the ones we deleted)
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000
AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s1),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <=
14000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s2),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <=
24000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s3),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <=
34000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s4),",
+ Collections.singleton("3591,"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <=
44000 AND time % 10 != 0",
+ "COUNT(root.sg1.d1.s5),",
+ Collections.singleton("3591,"));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 5c9985fe748..99d35f7eb98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -426,6 +426,11 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
}
} else {
+ if (!modsInfos.isEmpty()
+ && ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0)))
{
+ return false;
+ }
+
isNeedFillTime = true;
switch (tablet.getSchemas().get(0).getType()) {
case BOOLEAN: