This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b45088029 Pipe: Fix tablet event alignment and row count handling 
(#17999)
f1b45088029 is described below

commit f1b4508802980f1f09aca3dd1ac204b1692a9873
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 16:06:25 2026 +0800

    Pipe: Fix tablet event alignment and row count handling (#17999)
    
    * Pipe: Fix tablet event alignment and row count handling
    
    * Pipe: Add IT for tablet time range filtering
    
    * Update IoTDBPipeSourceIT.java
---
 .../treemodel/auto/basic/IoTDBPipeSourceIT.java    | 55 +++++++++---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  8 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  5 +-
 .../event/common/tablet/PipeTabletCollector.java   |  6 ++
 .../tablet/parser/TabletInsertionEventParser.java  | 12 ++-
 .../TabletInsertionEventTreePatternParser.java     |  3 +-
 .../pipe/event/PipeTabletInsertionEventTest.java   | 98 ++++++++++++++++++++++
 7 files changed, 168 insertions(+), 19 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 9af7cc84316..8f1004eefbc 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -35,6 +36,10 @@ import org.apache.iotdb.itbase.env.BaseEnv;
 import 
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -799,6 +804,7 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
       sourceAttributes.put("source.inclusion", "data");
       sourceAttributes.put("source.start-time", "1970-01-01T08:00:02+08:00");
       sourceAttributes.put("source.end-time", "1970-01-01T08:00:04+08:00");
+      sourceAttributes.put("source.realtime.mode", "stream");
       sourceAttributes.put("user", "root");
 
       final TSStatus status =
@@ -808,11 +814,10 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
                   .setProcessorAttributes(processorAttributes));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
 
+      final Map<String, String> expectedCountResult = new HashMap<>();
+      expectedCountResult.put("count(root.db.d1.at1)", "3");
       TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "select count(*) from root.db.**",
-          "count(root.db.d1.at1),",
-          Collections.singleton("3,"));
+          receiverEnv, "select count(*) from root.db.**", expectedCountResult);
 
       // Insert realtime data that overlapped with time range
       TestUtils.executeNonQueries(
@@ -823,11 +828,29 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
               "flush"),
           null);
 
+      expectedCountResult.put("count(root.db.d3.at1)", "3");
       TestUtils.assertDataEventuallyOnEnv(
-          receiverEnv,
-          "select count(*) from root.db.**",
-          "count(root.db.d1.at1),count(root.db.d3.at1),",
-          Collections.singleton("3,3,"));
+          receiverEnv, "select count(*) from root.db.**", expectedCountResult);
+
+      // Session Tablet can have unused timestamp slots when rowSize is 
smaller than maxRowNumber.
+      // The pipe source time range filter should ignore the unused zero tail.
+      final List<IMeasurementSchema> schemas =
+          Collections.singletonList(new MeasurementSchema("at1", 
TSDataType.INT32));
+      final Tablet tabletWithUnusedTail = new Tablet("root.db.d5", schemas, 5);
+      for (int time = 2000; time <= 4000; time += 1000) {
+        final int rowIndex = tabletWithUnusedTail.getRowSize();
+        tabletWithUnusedTail.addTimestamp(rowIndex, time);
+        tabletWithUnusedTail.addValue("at1", rowIndex, time / 1000);
+      }
+      Assert.assertEquals(3, tabletWithUnusedTail.getRowSize());
+      Assert.assertEquals(5, tabletWithUnusedTail.getTimestamps().length);
+      try (final ISession session = senderEnv.getSessionConnection()) {
+        session.insertTablet(tabletWithUnusedTail);
+      }
+
+      expectedCountResult.put("count(root.db.d5.at1)", "3");
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, "select count(*) from root.db.**", expectedCountResult);
 
       // Insert realtime data that does not overlap with time range
       TestUtils.executeNonQueries(
@@ -840,9 +863,19 @@ public class IoTDBPipeSourceIT extends 
AbstractPipeDualTreeModelAutoIT {
 
       TestUtils.assertDataAlwaysOnEnv(
           receiverEnv,
-          "select count(*) from root.db.**",
-          "count(root.db.d1.at1),count(root.db.d3.at1),",
-          Collections.singleton("3,3,"));
+          "select count(at1) from root.db.d1, root.db.d3, root.db.d5",
+          "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
+          Collections.singleton("3,3,3,"));
+      TestUtils.assertDataAlwaysOnEnv(
+          receiverEnv,
+          "show timeseries root.db.d2.**",
+          
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+          Collections.emptySet());
+      TestUtils.assertDataAlwaysOnEnv(
+          receiverEnv,
+          "show timeseries root.db.d4.**",
+          
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+          Collections.emptySet());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 3413f16144f..83fe9ba435b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -363,12 +363,14 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       }
 
       if (insertNode instanceof InsertTabletNode) {
-        final long[] timestamps = ((InsertTabletNode) insertNode).getTimes();
-        if (Objects.isNull(timestamps) || timestamps.length == 0) {
+        final InsertTabletNode insertTabletNode = (InsertTabletNode) 
insertNode;
+        final long[] timestamps = insertTabletNode.getTimes();
+        final int rowCount = insertTabletNode.getRowCount();
+        if (Objects.isNull(timestamps) || rowCount <= 0) {
           return false;
         }
         // We assume that `timestamps` is ordered.
-        return startTime <= timestamps[timestamps.length - 1] && timestamps[0] 
<= endTime;
+        return startTime <= timestamps[rowCount - 1] && timestamps[0] <= 
endTime;
       }
 
       if (insertNode instanceof InsertRowsNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 6829f099b47..5bdef252a2f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -376,11 +376,12 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
   @Override
   public boolean mayEventTimeOverlappedWithTimeRange() {
     final long[] timestamps = tablet.getTimestamps();
-    if (Objects.isNull(timestamps) || timestamps.length == 0) {
+    final int rowSize = tablet.getRowSize();
+    if (Objects.isNull(timestamps) || rowSize <= 0) {
       return false;
     }
     // We assume that `timestamps` is ordered.
-    return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <= 
endTime;
+    return startTime <= timestamps[rowSize - 1] && timestamps[0] <= endTime;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index b9da59f3111..a6a69144028 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -31,6 +31,12 @@ public class PipeTabletCollector extends 
PipeRawTabletEventConverter implements
     super(pipeTaskMeta, sourceEvent);
   }
 
+  public PipeTabletCollector(
+      PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, boolean isAligned) 
{
+    this(pipeTaskMeta, sourceEvent);
+    this.isAligned = isAligned;
+  }
+
   public PipeTabletCollector(
       PipeTaskMeta pipeTaskMeta,
       EnrichedEvent sourceEvent,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index fa99fa73a6c..594f97d7d60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -211,8 +211,13 @@ public abstract class TabletInsertionEventParser {
     this.isAligned = insertTabletNode.isAligned();
 
     final long[] originTimestampColumn = insertTabletNode.getTimes();
-    final List<Integer> rowIndexList = 
generateRowIndexList(originTimestampColumn);
-    this.timestampColumn = rowIndexList.stream().mapToLong(i -> 
originTimestampColumn[i]).toArray();
+    final int originRowCount = insertTabletNode.getRowCount();
+    final long[] actualTimestampColumn =
+        originTimestampColumn.length == originRowCount
+            ? originTimestampColumn
+            : Arrays.copyOf(originTimestampColumn, originRowCount);
+    final List<Integer> rowIndexList = 
generateRowIndexList(actualTimestampColumn);
+    this.timestampColumn = rowIndexList.stream().mapToLong(i -> 
actualTimestampColumn[i]).toArray();
 
     final MeasurementSchema[] originMeasurementSchemaList =
         insertTabletNode.getMeasurementSchemas();
@@ -434,6 +439,9 @@ public abstract class TabletInsertionEventParser {
 
   private List<Integer> generateRowIndexList(final long[] 
originTimestampColumn) {
     final int rowCount = originTimestampColumn.length;
+    if (rowCount == 0) {
+      return generateFullRowIndexList(rowCount);
+    }
     if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) {
       return generateFullRowIndexList(rowCount);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
index 9655175759e..184fa124d89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
@@ -185,7 +185,8 @@ public class TabletInsertionEventTreePatternParser extends 
TabletInsertionEventP
   @Override
   public List<TabletInsertionEvent> processTabletWithCollect(
       BiConsumer<Tablet, TabletCollector> consumer) {
-    final PipeTabletCollector tabletCollector = new 
PipeTabletCollector(pipeTaskMeta, sourceEvent);
+    final PipeTabletCollector tabletCollector =
+        new PipeTabletCollector(pipeTaskMeta, sourceEvent, isAligned);
     consumer.accept(convertToTablet(), tabletCollector);
     return tabletCollector.convertToTabletInsertionEvents(shouldReport);
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 1aface542b1..cfe35ef041d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
@@ -54,6 +55,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.time.LocalDate;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -324,6 +326,30 @@ public class PipeTabletInsertionEventTest {
     Assert.assertTrue(isAligned4);
   }
 
+  @Test
+  public void processAlignedTabletWithCollectPreservesAlignmentForTest() {
+    final PipeRawTabletInsertionEvent event =
+        new PipeRawTabletInsertionEvent(
+            tabletForInsertTabletNode, true, new PrefixTreePattern(pattern));
+
+    final List<TabletInsertionEvent> events = new ArrayList<>();
+    event
+        .processTabletWithCollect(
+            (tablet, collector) -> {
+              try {
+                collector.collectTablet(tablet);
+              } catch (final Exception e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .forEach(events::add);
+
+    Assert.assertEquals(1, events.size());
+    final PipeRawTabletInsertionEvent collectedEvent = 
(PipeRawTabletInsertionEvent) events.get(0);
+    Assert.assertEquals(tabletForInsertTabletNode, 
collectedEvent.convertToTablet());
+    Assert.assertTrue(collectedEvent.isAligned());
+  }
+
   @Test
   public void collectRowWithOverriddenTreeDatabaseForTest() {
     final PipeRowCollector rowCollector = new PipeRowCollector(null, null, 
"root.test.sg_0", false);
@@ -525,6 +551,78 @@ public class PipeTabletInsertionEventTest {
     Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
   }
 
+  @Test
+  public void isEventTimeOverlappedWithTimeRangeUsesActualRowSizeForTest() 
throws Exception {
+    final long[] timestamps = new long[] {110L, 111L, 112L, 0L, 0L};
+
+    final Tablet partialTablet = new Tablet(deviceId, Arrays.asList(schemas), 
times.length);
+    partialTablet.setTimestamps(timestamps);
+    partialTablet.setRowSize(3);
+
+    PipeRawTabletInsertionEvent rawEvent =
+        new PipeRawTabletInsertionEvent(partialTablet, 111L, 112L);
+    Assert.assertTrue(rawEvent.mayEventTimeOverlappedWithTimeRange());
+    rawEvent = new PipeRawTabletInsertionEvent(partialTablet, 113L, 
Long.MAX_VALUE);
+    Assert.assertFalse(rawEvent.mayEventTimeOverlappedWithTimeRange());
+
+    final InsertTabletNode partialInsertTabletNode =
+        new InsertTabletNode(
+            new PlanNodeId("partial tablet node"),
+            new PartialPath(deviceId),
+            false,
+            measurementIds,
+            dataTypes,
+            schemas,
+            timestamps,
+            null,
+            insertTabletNode.getColumns(),
+            3);
+
+    final Tablet convertedTablet =
+        new TabletInsertionEventTreePatternParser(
+                partialInsertTabletNode, new PrefixTreePattern(pattern))
+            .convertToTablet();
+    Assert.assertEquals(3, convertedTablet.getRowSize());
+    Assert.assertArrayEquals(
+        new long[] {110L, 111L, 112L},
+        Arrays.copyOf(convertedTablet.getTimestamps(), 
convertedTablet.getRowSize()));
+
+    PipeInsertNodeTabletInsertionEvent insertNodeEvent =
+        new PipeInsertNodeTabletInsertionEvent(
+            false,
+            "root.sg",
+            partialInsertTabletNode,
+            null,
+            0,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            true,
+            111L,
+            112L);
+    Assert.assertTrue(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+    insertNodeEvent =
+        new PipeInsertNodeTabletInsertionEvent(
+            false,
+            "root.sg",
+            partialInsertTabletNode,
+            null,
+            0,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            true,
+            113L,
+            Long.MAX_VALUE);
+    Assert.assertFalse(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+  }
+
   @Test
   public void testAuthCheck() {
     PipeInsertNodeTabletInsertionEvent event;

Reply via email to