This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ceaae51b35a Pipe: Fix tablet event alignment and row count handling
(#18034)
ceaae51b35a is described below
commit ceaae51b35a734a42d5a7e749db9c44de23d68f1
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:11:46 2026 +0800
Pipe: Fix tablet event alignment and row count handling (#18034)
* Pipe: Fix tablet event alignment and row count handling (#17999)
* Pipe: Fix tablet event alignment and row count handling
* Pipe: Add IT for tablet time range filtering
* Update IoTDBPipeSourceIT.java
(cherry picked from commit f1b4508802980f1f09aca3dd1ac204b1692a9873)
* Fix pipe source IT tablet schema type
* Fix pipe source tablet row count test
---
.../pipe/it/autocreate/IoTDBPipeSourceIT.java | 53 ++++++++++++----
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 8 ++-
.../common/tablet/PipeRawTabletInsertionEvent.java | 5 +-
.../event/common/tablet/PipeTabletCollector.java | 6 ++
.../tablet/TabletInsertionDataContainer.java | 15 ++++-
.../pipe/event/PipeTabletInsertionEventTest.java | 73 ++++++++++++++++++++++
6 files changed, 141 insertions(+), 19 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
index bb160a6f295..2bd290bfc81 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSourceIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -33,6 +34,9 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -791,11 +795,10 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualAutoIT {
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ final Map<String, String> expectedCountResult = new HashMap<>();
+ expectedCountResult.put("count(root.db.d1.at1)", "3");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "select count(*) from root.**",
- "count(root.db.d1.at1),",
- Collections.singleton("3,"));
+ receiverEnv, "select count(*) from root.db.**", expectedCountResult);
// Insert realtime data that overlapped with time range
TestUtils.executeNonQueries(
@@ -806,11 +809,29 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualAutoIT {
"flush"),
null);
+ expectedCountResult.put("count(root.db.d3.at1)", "3");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv,
- "select count(*) from root.**",
- "count(root.db.d1.at1),count(root.db.d3.at1),",
- Collections.singleton("3,3,"));
+ receiverEnv, "select count(*) from root.db.**", expectedCountResult);
+
+ // Session Tablet can have unused timestamp slots when rowSize is
smaller than maxRowNumber.
+ // The pipe source time range filter should ignore the unused zero tail.
+ final List<MeasurementSchema> schemas =
+ Collections.singletonList(new MeasurementSchema("at1",
TSDataType.INT32));
+ final Tablet tabletWithUnusedTail = new Tablet("root.db.d5", schemas, 5);
+ for (int time = 2000; time <= 4000; time += 1000) {
+ final int rowIndex = tabletWithUnusedTail.rowSize++;
+ tabletWithUnusedTail.addTimestamp(rowIndex, time);
+ tabletWithUnusedTail.addValue("at1", rowIndex, time / 1000);
+ }
+ Assert.assertEquals(3, tabletWithUnusedTail.rowSize);
+ Assert.assertEquals(5, tabletWithUnusedTail.timestamps.length);
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ session.insertTablet(tabletWithUnusedTail);
+ }
+
+ expectedCountResult.put("count(root.db.d5.at1)", "3");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "select count(*) from root.db.**", expectedCountResult);
// Insert realtime data that does not overlap with time range
TestUtils.executeNonQueries(
@@ -823,9 +844,19 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualAutoIT {
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
- "select count(*) from root.**",
- "count(root.db.d1.at1),count(root.db.d3.at1),",
- Collections.singleton("3,3,"));
+ "select count(at1) from root.db.d1, root.db.d3, root.db.d5",
+ "count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
+ Collections.singleton("3,3,3,"));
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "show timeseries root.db.d2.**",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ Collections.emptySet());
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "show timeseries root.db.d4.**",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ Collections.emptySet());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index bc7040a0598..84a0f533e23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -214,12 +214,14 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
}
if (insertNode instanceof InsertTabletNode) {
- final long[] timestamps = ((InsertTabletNode) insertNode).getTimes();
- if (Objects.isNull(timestamps) || timestamps.length == 0) {
+ final InsertTabletNode insertTabletNode = (InsertTabletNode)
insertNode;
+ final long[] timestamps = insertTabletNode.getTimes();
+ final int rowCount = insertTabletNode.getRowCount();
+ if (Objects.isNull(timestamps) || rowCount <= 0) {
return false;
}
// We assume that `timestamps` is ordered.
- return startTime <= timestamps[timestamps.length - 1] && timestamps[0]
<= endTime;
+ return startTime <= timestamps[rowCount - 1] && timestamps[0] <=
endTime;
}
if (insertNode instanceof InsertRowsNode) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 6d6995f0231..f47544ab64f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -318,11 +318,12 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent
@Override
public boolean mayEventTimeOverlappedWithTimeRange() {
final long[] timestamps = tablet.timestamps;
- if (Objects.isNull(timestamps) || timestamps.length == 0) {
+ final int rowSize = tablet.rowSize;
+ if (Objects.isNull(timestamps) || rowSize <= 0) {
return false;
}
// We assume that `timestamps` is ordered.
- return startTime <= timestamps[timestamps.length - 1] && timestamps[0] <=
endTime;
+ return startTime <= timestamps[rowSize - 1] && timestamps[0] <= endTime;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index b9da59f3111..a6a69144028 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -31,6 +31,12 @@ public class PipeTabletCollector extends
PipeRawTabletEventConverter implements
super(pipeTaskMeta, sourceEvent);
}
+ public PipeTabletCollector(
+ PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, boolean isAligned)
{
+ this(pipeTaskMeta, sourceEvent);
+ this.isAligned = isAligned;
+ }
+
public PipeTabletCollector(
PipeTaskMeta pipeTaskMeta,
EnrichedEvent sourceEvent,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index d8c2bccaa97..a2a8c27ef26 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -218,8 +218,13 @@ public class TabletInsertionDataContainer {
this.isAligned = insertTabletNode.isAligned();
final long[] originTimestampColumn = insertTabletNode.getTimes();
- final List<Integer> rowIndexList =
generateRowIndexList(originTimestampColumn);
- this.timestampColumn = rowIndexList.stream().mapToLong(i ->
originTimestampColumn[i]).toArray();
+ final int originRowCount = insertTabletNode.getRowCount();
+ final long[] actualTimestampColumn =
+ originTimestampColumn.length == originRowCount
+ ? originTimestampColumn
+ : Arrays.copyOf(originTimestampColumn, originRowCount);
+ final List<Integer> rowIndexList =
generateRowIndexList(actualTimestampColumn);
+ this.timestampColumn = rowIndexList.stream().mapToLong(i ->
actualTimestampColumn[i]).toArray();
generateColumnIndexMapper(
insertTabletNode.getMeasurements(),
@@ -407,6 +412,9 @@ public class TabletInsertionDataContainer {
private List<Integer> generateRowIndexList(final long[]
originTimestampColumn) {
final int rowCount = originTimestampColumn.length;
+ if (rowCount == 0) {
+ return generateFullRowIndexList(rowCount);
+ }
if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) {
return generateFullRowIndexList(rowCount);
}
@@ -680,7 +688,8 @@ public class TabletInsertionDataContainer {
public List<TabletInsertionEvent> processTabletWithCollect(
BiConsumer<Tablet, TabletCollector> consumer) {
- final PipeTabletCollector tabletCollector = new
PipeTabletCollector(pipeTaskMeta, sourceEvent);
+ final PipeTabletCollector tabletCollector =
+ new PipeTabletCollector(pipeTaskMeta, sourceEvent, isAligned);
consumer.accept(convertToTablet(), tabletCollector);
return tabletCollector.convertToTabletInsertionEvents(shouldReport);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 8a290bd1803..b46a86e9944 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import
org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
@@ -42,6 +44,7 @@ import org.junit.Before;
import org.junit.Test;
import java.time.LocalDate;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -323,6 +326,30 @@ public class PipeTabletInsertionEventTest {
Assert.assertTrue(isAligned4);
}
+ @Test
+ public void processAlignedTabletWithCollectPreservesAlignmentForTest() {
+ final PipeRawTabletInsertionEvent event =
+ new PipeRawTabletInsertionEvent(
+ tabletForInsertTabletNode, true, new PrefixPipePattern(pattern));
+
+ final List<TabletInsertionEvent> events = new ArrayList<>();
+ event
+ .processTabletWithCollect(
+ (tablet, collector) -> {
+ try {
+ collector.collectTablet(tablet);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(events::add);
+
+ Assert.assertEquals(1, events.size());
+ final PipeRawTabletInsertionEvent collectedEvent =
(PipeRawTabletInsertionEvent) events.get(0);
+ Assert.assertEquals(tabletForInsertTabletNode,
collectedEvent.convertToTablet());
+ Assert.assertTrue(collectedEvent.isAligned());
+ }
+
@Test
public void collectRowWithOverriddenTreeDatabaseForTest() {
final PipeRowCollector rowCollector = new PipeRowCollector(null, null,
"root.test.sg_0", false);
@@ -449,4 +476,50 @@ public class PipeTabletInsertionEventTest {
event = new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 115L,
Long.MAX_VALUE);
Assert.assertFalse(event.mayEventTimeOverlappedWithTimeRange());
}
+
+ @Test
+ public void isEventTimeOverlappedWithTimeRangeUsesActualRowSizeForTest()
throws Exception {
+ final long[] timestamps = new long[] {110L, 111L, 112L, 0L, 0L};
+
+ final Tablet partialTablet = new Tablet(deviceId, Arrays.asList(schemas),
times.length);
+ partialTablet.timestamps = timestamps;
+ partialTablet.rowSize = 3;
+
+ PipeRawTabletInsertionEvent rawEvent =
+ new PipeRawTabletInsertionEvent(partialTablet, 111L, 112L);
+ Assert.assertTrue(rawEvent.mayEventTimeOverlappedWithTimeRange());
+ rawEvent = new PipeRawTabletInsertionEvent(partialTablet, 113L,
Long.MAX_VALUE);
+ Assert.assertFalse(rawEvent.mayEventTimeOverlappedWithTimeRange());
+
+ final InsertTabletNode partialInsertTabletNode =
+ new InsertTabletNode(
+ new PlanNodeId("partial tablet node"),
+ new PartialPath(deviceId),
+ false,
+ measurementIds,
+ dataTypes,
+ schemas,
+ timestamps,
+ null,
+ insertTabletNode.getColumns(),
+ 3);
+
+ final Tablet convertedTablet =
+ new TabletInsertionDataContainer(partialInsertTabletNode, new
PrefixPipePattern(pattern))
+ .convertToTablet();
+ Assert.assertEquals(3, convertedTablet.rowSize);
+ Assert.assertArrayEquals(
+ new long[] {110L, 111L, 112L},
+ Arrays.copyOf(convertedTablet.timestamps, convertedTablet.rowSize));
+
+ PipeInsertNodeTabletInsertionEvent insertNodeEvent =
+ new PipeInsertNodeTabletInsertionEvent(partialInsertTabletNode)
+ .shallowCopySelfAndBindPipeTaskMetaForProgressReport(null, 0,
null, null, 111L, 112L);
+ Assert.assertTrue(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+ insertNodeEvent =
+ new PipeInsertNodeTabletInsertionEvent(partialInsertTabletNode)
+ .shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ null, 0, null, null, 113L, Long.MAX_VALUE);
+ Assert.assertFalse(insertNodeEvent.mayEventTimeOverlappedWithTimeRange());
+ }
}