This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a03b627a45c Fix: Add Mods data scanning check in scan parser for 
non-aligned data (#17162)
a03b627a45c is described below

commit a03b627a45cdcce3df195800dc2f93cbbc21bbe5
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Feb 5 09:34:41 2026 +0800

    Fix: Add Mods data scanning check in scan parser for non-aligned data 
(#17162)
    
    * Fix: Add Mods data scanning check in scan parser for non-aligned data
    
    This commit fixes the issue where the scan parser was not checking
    Mods (modifications) data when processing non-aligned time series data.
    Previously, only aligned data (VECTOR type) was checked for deletions
    via Mods, but non-aligned single measurement data was missing this check.
    
    The fix adds a Mods deletion check in the putValueToColumns method
    for non-aligned data paths, ensuring that deleted data points are
    properly filtered out during scan operations. This maintains consistency
    with the aligned data handling and prevents deleted data from being
    processed.
    
    * add IT
---
 .../IoTDBPipeTsFileDecompositionWithModsIT.java    | 316 +++++++++++++++++++++
 .../scan/TsFileInsertionEventScanParser.java       |   5 +
 2 files changed, 321 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
index 89f1c3fb440..87571e1eb71 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -657,4 +657,320 @@ public class IoTDBPipeTsFileDecompositionWithModsIT 
extends AbstractPipeDualTree
         "COUNT(root.sg1.d1.s5),",
         Collections.singleton("3591,"));
   }
+
+  /**
+   * Test IoTDB pipe handling TsFile decomposition with Mods (modification 
operations) in tree model
+   * - Large scale single point deletion scenario (non-aligned timeseries)
+   *
+   * <p>Test scenario: 1. Create database root.sg1 with 1 device: d1 
(non-aligned timeseries with 5
+   * sensors) 2. Insert 20000 data points for each sensor with different time 
ranges: - s1: time
+   * 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time 
30001-50000 - s5: time
+   * 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4. 
Execute 2000 single point
+   * DELETE operations, each deleting one time point from different sensors 5. 
Execute FLUSH
+   * operation again 6. Create pipe with mods enabled 7. Verify correctness of 
receiver data: - Each
+   * sensor should have 19800 remaining data points - Deleted points should 
not appear in receiver
+   *
+   * <p>Test purpose: Verify that IoTDB pipe can correctly handle large scale 
single point deletion
+   * operations in TsFile under tree model with non-aligned timeseries, 
ensuring the binary search
+   * optimization in ModsOperationUtil works correctly with many modification 
entries.
+   */
+  @Test
+  public void 
testTsFileDecompositionWithModsLargeScaleSinglePointDeletionNonAligned()
+      throws Exception {
+    TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
+    TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE 
root.sg1");
+
+    // Insert 20000 data points for s1 (time 1-20000)
+    String s1 = "INSERT INTO root.sg1.d1(time, s1) VALUES ";
+    StringBuilder insertBuilder1 = new StringBuilder(s1);
+    for (int i = 1; i <= 20000; i++) {
+      
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
+      if (i % 50 != 0) {
+        insertBuilder1.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder1.toString());
+        insertBuilder1 = new StringBuilder(s1);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder1.length() > s1.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
+    }
+
+    // Insert 20000 data points for s2 (time 10001-30000)
+    String s2 = "INSERT INTO root.sg1.d1(time, s2) VALUES ";
+    StringBuilder insertBuilder2 = new StringBuilder(s2);
+    for (int i = 10001; i <= 30000; i++) {
+      
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
+      if (i % 50 != 0) {
+        insertBuilder2.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder2.toString());
+        insertBuilder2 = new StringBuilder(s2);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder2.length() > s2.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
+    }
+
+    // Insert 20000 data points for s3 (time 20001-40000)
+    String s3 = "INSERT INTO root.sg1.d1(time, s3) VALUES ";
+    StringBuilder insertBuilder3 = new StringBuilder(s3);
+    for (int i = 20001; i <= 40000; i++) {
+      
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
+      if (i % 50 != 0) {
+        insertBuilder3.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder3.toString());
+        insertBuilder3 = new StringBuilder(s3);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder3.length() > s3.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
+    }
+
+    // Insert 20000 data points for s4 (time 30001-50000)
+    String s4 = "INSERT INTO root.sg1.d1(time, s4) VALUES ";
+    StringBuilder insertBuilder4 = new StringBuilder(s4);
+    for (int i = 30001; i <= 50000; i++) {
+      
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
+      if (i % 50 != 0) {
+        insertBuilder4.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder4.toString());
+        insertBuilder4 = new StringBuilder(s4);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder4.length() > s4.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
+    }
+
+    // Insert 20000 data points for s5 (time 40001-60000)
+    String s5 = "INSERT INTO root.sg1.d1(time, s5) VALUES ";
+    StringBuilder insertBuilder5 = new StringBuilder(s5);
+    for (int i = 40001; i <= 60000; i++) {
+      
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
+      if (i % 50 != 0) {
+        insertBuilder5.append(",");
+      } else {
+        TestUtils.executeNonQueryWithRetry(senderEnv, 
insertBuilder5.toString());
+        insertBuilder5 = new StringBuilder(s5);
+      }
+    }
+    // Execute remaining data if any
+    if (insertBuilder5.length() > s5.length()) {
+      TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());
+    }
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    // Execute 2000 single point DELETE operations
+    // Delete 400 points from each sensor (distributed across different time 
ranges)
+    for (int i = 0; i < 400; i++) {
+      // Delete from s1: every 10th point starting from 10
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i * 
10));
+
+      // Delete from s2: every 10th point starting from 10010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i * 
10));
+
+      // Delete from s3: every 10th point starting from 20010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i * 
10));
+
+      // Delete from s4: every 10th point starting from 30010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i * 
10));
+
+      // Delete from s5: every 10th point starting from 40010
+      TestUtils.executeNonQueryWithRetry(
+          senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i * 
10));
+    }
+
+    TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");
+
+    // Verify sender data integrity before creating pipe to avoid leader 
election issues
+    // This ensures all data is properly persisted and consistent on sender 
side
+    // before starting the pipe synchronization process
+    TestUtils.assertDataEventuallyOnEnv(
+        senderEnv,
+        "SELECT COUNT(**) FROM root.sg1.d1",
+        
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+        Collections.singleton("19600,19600,19600,19600,19600,"));
+
+    executeNonQueryWithRetry(
+        senderEnv,
+        String.format(
+            "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH 
CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
+            receiverEnv.getDataNodeWrapperList().get(0).getIp(),
+            receiverEnv.getDataNodeWrapperList().get(0).getPort()));
+
+    // Verify total count of all sensors using COUNT(*)
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(**) FROM root.sg1.d1",
+        
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
+        Collections.singleton("19600,19600,19600,19600,19600,"));
+
+    // Verify individual sensor counts
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("19600,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("19600,"));
+
+    // Verify count of deleted time ranges using COUNT with WHERE clause
+    // These should return 0 since all points in these ranges were deleted
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 
AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 
14000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 
24000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 
34000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("0,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 
44000 AND time % 10 = 0",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("0,"));
+
+    // Verify count of non-deleted time ranges using multiple range queries
+    // Check ranges before deletion area
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time < 
10010",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time < 
20010",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time < 
30010",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("9,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time < 
40010",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("9,"));
+
+    // Check ranges after deletion area
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <= 
20000",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <= 
30000",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <= 
40000",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <= 
50000",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("16000,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <= 
60000",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("16000,"));
+
+    // Check non-deleted points within deletion range (every 10th point except 
the ones we deleted)
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 
AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s1),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 
14000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s2),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 
24000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s3),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 
34000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s4),",
+        Collections.singleton("3591,"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 
44000 AND time % 10 != 0",
+        "COUNT(root.sg1.d1.s5),",
+        Collections.singleton("3591,"));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 5c9985fe748..99d35f7eb98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -426,6 +426,11 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         }
       }
     } else {
+      if (!modsInfos.isEmpty()
+          && ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0))) 
{
+        return false;
+      }
+
       isNeedFillTime = true;
       switch (tablet.getSchemas().get(0).getType()) {
         case BOOLEAN:

Reply via email to