This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f1b45088029 Pipe: Fix tablet event alignment and row count handling
(#17999)
f1b45088029 is described below
commit f1b4508802980f1f09aca3dd1ac204b1692a9873
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 16:06:25 2026 +0800
Pipe: Fix tablet event alignment and row count handling (#17999)
* Pipe: Fix tablet event alignment and row count handling
* Pipe: Add IT for tablet time range filtering
* Update IoTDBPipeSourceIT.java
---
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 55 +++++++++---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 8 +-
.../common/tablet/PipeRawTabletInsertionEvent.java | 5 +-
.../event/common/tablet/PipeTabletCollector.java | 6 ++
.../tablet/parser/TabletInsertionEventParser.java | 12 ++-
.../TabletInsertionEventTreePatternParser.java | 3 +-
.../pipe/event/PipeTabletInsertionEventTest.java | 98 ++++++++++++++++++++++
7 files changed, 168 insertions(+), 19 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index 9af7cc84316..8f1004eefbc 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
@@ -35,6 +36,10 @@ import org.apache.iotdb.itbase.env.BaseEnv;
import
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -799,6 +804,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
sourceAttributes.put("source.inclusion", "data");
sourceAttributes.put("source.start-time", "1970-01-01T08:00:02+08:00");
sourceAttributes.put("source.end-time", "1970-01-01T08:00:04+08:00");
+ sourceAttributes.put("source.realtime.mode", "stream");
sourceAttributes.put("user", "root");
final TSStatus status =
@@ -808,11 +814,10 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ final Map<String, String> expectedCountResult = new HashMap<>();
+ expectedCountResult.put("count(root.db.d1.at1)", "3");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "select count(*) from root.db.**",
- "count(root.db.d1.at1),",
- Collections.singleton("3,"));
+ receiverEnv, "select count(*) from root.db.**", expectedCountResult);
// Insert realtime data that overlapped with time range
TestUtils.executeNonQueries(
@@ -823,11 +828,29 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
"flush"),
null);
+ expectedCountResult.put("count(root.db.d3.at1)", "3");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "select count(*) from root.db.**",
- "count(root.db.d1.at1),count(root.db.d3.at1),",
- Collections.singleton("3,3,"));
+ receiverEnv, "select count(*) from root.db.**", expectedCountResult);
+
+ // Session Tablet can have unused timestamp slots when rowSize is
smaller than maxRowNumber.
+ // The pipe source time range filter should ignore the unused zero tail.
+ final List<IMeasurementSchema> schemas =
+ Collections.singletonList(new MeasurementSchema("at1",
TSDataType.INT32));
+ final Tablet tabletWithUnusedTail = new Tablet("root.db.d5", schemas, 5);
+ for (int time = 2000; time <= 4000; time += 1000) {
+ final int rowIndex = tabletWithUnusedTail.getRowSize();
+ tabletWithUnusedTail.addTimestamp(rowIndex, time);
+ tabletWithUnusedTail.addValue("at1", rowIndex, time / 1000);
+ }
+ Assert.assertEquals(3, tabletWithUnusedTail.getRowSize());
+ Assert.assertEquals(5, tabletWithUnusedTail.getTimestamps().length);
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ session.insertTablet(tabletWithUnusedTail);
+ }
+
+ expectedCountResult.put("count(root.db.d5.at1)", "3");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "select count(*) from root.db.**", expectedCountResult);
// Insert realtime data that does not overlap with time range
TestUtils.executeNonQueries(
@@ -840,9 +863,19 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
- "select count(*) from root.db.**",
- "count(root.db.d1.at1),count(root.db.d3.at1),",
- Collections.singleton("3,3,"));
+ "select count(at1) from root.db.d1, root.db.d3, root.db.d5",
+ "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
+ Collections.singleton("3,3,3,"));
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "show timeseries root.db.d2.**",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ Collections.emptySet());
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "show timeseries root.db.d4.**",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ Collections.emptySet());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 3413f16144f..83fe9ba435b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -363,12 +363,14 @@ public class PipeInsertNodeTabletInsertionEvent extends
PipeInsertionEvent
}
if (insertNode instanceof InsertTabletNode) {
- final long[] timestamps = ((InsertTabletNode) insertNode).getTimes();
- if (Objects.isNull(timestamps) || timestamps.length == 0) {
+ final InsertTabletNode insertTabletNode = (InsertTabletNode)
insertNode;
+ final long[] timestamps = insertTabletNode.getTimes();
+ final int rowCount = insertTabletNode.getRowCount();
+ if (Objects.isNull(timestamps) || rowCount <= 0) {
return false;
}
// We assume that `timestamps` is ordered.
- return startTime <= timestamps[timestamps.length - 1] && timestamps[0]
<= endTime;
+ return startTime <= timestamps[rowCount - 1] && timestamps[0] <=
endTime;
}
if (insertNode instanceof InsertRowsNode) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 6829f099b47..5bdef252a2f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -376,11 +376,12 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
@Override
public boolean mayEventTimeOverlappedWithTimeRange() {
final long[] timestamps = tablet.getTimestamps();
- if (Objects.isNull(timestamps) || timestamps.length == 0) {
+ final int rowSize = tablet.getRowSize();
+ if (Objects.isNull(timestamps) || rowSize <= 0) {
return false;
}
// We assume that `timestamps` is ordered.
- return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <=
endTime;
+ return startTime <= timestamps[rowSize - 1] && timestamps[0] <= endTime;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index b9da59f3111..a6a69144028 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -31,6 +31,12 @@ public class PipeTabletCollector extends
PipeRawTabletEventConverter implements
super(pipeTaskMeta, sourceEvent);
}
+ public PipeTabletCollector(
+ PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, boolean isAligned)
{
+ this(pipeTaskMeta, sourceEvent);
+ this.isAligned = isAligned;
+ }
+
public PipeTabletCollector(
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index fa99fa73a6c..594f97d7d60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -211,8 +211,13 @@ public abstract class TabletInsertionEventParser {
this.isAligned = insertTabletNode.isAligned();
final long[] originTimestampColumn = insertTabletNode.getTimes();
- final List<Integer> rowIndexList =
generateRowIndexList(originTimestampColumn);
- this.timestampColumn = rowIndexList.stream().mapToLong(i ->
originTimestampColumn[i]).toArray();
+ final int originRowCount = insertTabletNode.getRowCount();
+ final long[] actualTimestampColumn =
+ originTimestampColumn.length == originRowCount
+ ? originTimestampColumn
+ : Arrays.copyOf(originTimestampColumn, originRowCount);
+ final List<Integer> rowIndexList =
generateRowIndexList(actualTimestampColumn);
+ this.timestampColumn = rowIndexList.stream().mapToLong(i ->
actualTimestampColumn[i]).toArray();
final MeasurementSchema[] originMeasurementSchemaList =
insertTabletNode.getMeasurementSchemas();
@@ -434,6 +439,9 @@ public abstract class TabletInsertionEventParser {
private List<Integer> generateRowIndexList(final long[]
originTimestampColumn) {
final int rowCount = originTimestampColumn.length;
+ if (rowCount == 0) {
+ return generateFullRowIndexList(rowCount);
+ }
if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) {
return generateFullRowIndexList(rowCount);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
index 9655175759e..184fa124d89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
@@ -185,7 +185,8 @@ public class TabletInsertionEventTreePatternParser extends
TabletInsertionEventP
@Override
public List<TabletInsertionEvent> processTabletWithCollect(
BiConsumer<Tablet, TabletCollector> consumer) {
- final PipeTabletCollector tabletCollector = new
PipeTabletCollector(pipeTaskMeta, sourceEvent);
+ final PipeTabletCollector tabletCollector =
+ new PipeTabletCollector(pipeTaskMeta, sourceEvent, isAligned);
consumer.accept(convertToTablet(), tabletCollector);
return tabletCollector.convertToTabletInsertionEvents(shouldReport);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 1aface542b1..cfe35ef041d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
@@ -54,6 +55,7 @@ import org.junit.Before;
import org.junit.Test;
import java.time.LocalDate;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -324,6 +326,30 @@ public class PipeTabletInsertionEventTest {
Assert.assertTrue(isAligned4);
}
+ @Test
+ public void processAlignedTabletWithCollectPreservesAlignmentForTest() {
+ final PipeRawTabletInsertionEvent event =
+ new PipeRawTabletInsertionEvent(
+ tabletForInsertTabletNode, true, new PrefixTreePattern(pattern));
+
+ final List<TabletInsertionEvent> events = new ArrayList<>();
+ event
+ .processTabletWithCollect(
+ (tablet, collector) -> {
+ try {
+ collector.collectTablet(tablet);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(events::add);
+
+ Assert.assertEquals(1, events.size());
+ final PipeRawTabletInsertionEvent collectedEvent =
(PipeRawTabletInsertionEvent) events.get(0);
+ Assert.assertEquals(tabletForInsertTabletNode,
collectedEvent.convertToTablet());
+ Assert.assertTrue(collectedEvent.isAligned());
+ }
+
@Test
public void collectRowWithOverriddenTreeDatabaseForTest() {
final PipeRowCollector rowCollector = new PipeRowCollector(null, null,
"root.test.sg_0", false);
@@ -525,6 +551,78 @@ public class PipeTabletInsertionEventTest {
Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
}
+ @Test
+ public void isEventTimeOverlappedWithTimeRangeUsesActualRowSizeForTest()
throws Exception {
+ final long[] timestamps = new long[] {110L, 111L, 112L, 0L, 0L};
+
+ final Tablet partialTablet = new Tablet(deviceId, Arrays.asList(schemas),
times.length);
+ partialTablet.setTimestamps(timestamps);
+ partialTablet.setRowSize(3);
+
+ PipeRawTabletInsertionEvent rawEvent =
+ new PipeRawTabletInsertionEvent(partialTablet, 111L, 112L);
+ Assert.assertTrue(rawEvent.mayEventTimeOverlappedWithTimeRange());
+ rawEvent = new PipeRawTabletInsertionEvent(partialTablet, 113L,
Long.MAX_VALUE);
+ Assert.assertFalse(rawEvent.mayEventTimeOverlappedWithTimeRange());
+
+ final InsertTabletNode partialInsertTabletNode =
+ new InsertTabletNode(
+ new PlanNodeId("partial tablet node"),
+ new PartialPath(deviceId),
+ false,
+ measurementIds,
+ dataTypes,
+ schemas,
+ timestamps,
+ null,
+ insertTabletNode.getColumns(),
+ 3);
+
+ final Tablet convertedTablet =
+ new TabletInsertionEventTreePatternParser(
+ partialInsertTabletNode, new PrefixTreePattern(pattern))
+ .convertToTablet();
+ Assert.assertEquals(3, convertedTablet.getRowSize());
+ Assert.assertArrayEquals(
+ new long[] {110L, 111L, 112L},
+ Arrays.copyOf(convertedTablet.getTimestamps(),
convertedTablet.getRowSize()));
+
+ PipeInsertNodeTabletInsertionEvent insertNodeEvent =
+ new PipeInsertNodeTabletInsertionEvent(
+ false,
+ "root.sg",
+ partialInsertTabletNode,
+ null,
+ 0,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ 111L,
+ 112L);
+ Assert.assertTrue(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+ insertNodeEvent =
+ new PipeInsertNodeTabletInsertionEvent(
+ false,
+ "root.sg",
+ partialInsertTabletNode,
+ null,
+ 0,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ 113L,
+ Long.MAX_VALUE);
+ Assert.assertFalse(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+ }
+
@Test
public void testAuthCheck() {
PipeInsertNodeTabletInsertionEvent event;