This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6fd3870b90a Pipe: Optimized the table model writing latency by
batching & Fixed the NPE caused by tablet event sorting
6fd3870b90a is described below
commit 6fd3870b90a0b92487b6bdd59dacdc7db59e87a8
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 31 20:00:21 2025 +0800
Pipe: Optimized the table model writing latency by batching & Fixed the NPE
caused by tablet event sorting
---
.../evolvable/batch/PipeTabletEventPlainBatch.java | 130 ++++++++++++++-------
.../request/PipeTransferTabletBatchReqV2.java | 48 +++++++-
.../sink/util/sorter/PipeTabletEventSorter.java | 2 +-
3 files changed, 137 insertions(+), 43 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 31b10736499..a68c5ae9602 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -22,14 +22,17 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.record.Tablet;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -38,7 +41,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
@@ -51,13 +54,13 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
private final List<String> insertNodeDataBases = new ArrayList<>();
private final List<String> tabletDataBases = new ArrayList<>();
+ // database -> tableName -> Pair<size, tablets to batch>
+ private final Map<String, Map<String, Pair<Integer, List<Tablet>>>>
tableModelTabletMap =
+ new HashMap<>();
+
// Used to rate limit when transferring data
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new
HashMap<>();
- PipeTabletEventPlainBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
- super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
- }
-
PipeTabletEventPlainBatch(
final int maxDelayInMs,
final long requestMaxBatchSizeInBytes,
@@ -66,9 +69,8 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
}
@Override
- protected boolean constructBatch(final TabletInsertionEvent event)
- throws WALPipeException, IOException {
- final int bufferSize = buildTabletInsertionBuffer(event);
+ protected boolean constructBatch(final TabletInsertionEvent event) throws
IOException {
+ final long bufferSize = buildTabletInsertionBuffer(event);
totalBufferSize += bufferSize;
pipe2BytesAccumulated.compute(
new Pair<>(
@@ -89,11 +91,45 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
binaryDataBases.clear();
insertNodeDataBases.clear();
tabletDataBases.clear();
+ tableModelTabletMap.clear();
pipe2BytesAccumulated.clear();
}
public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
+ for (final Map.Entry<String, Map<String, Pair<Integer, List<Tablet>>>>
insertTablets :
+ tableModelTabletMap.entrySet()) {
+ final String databaseName = insertTablets.getKey();
+ for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
+ insertTablets.getValue().entrySet()) {
+ final List<Tablet> batchTablets = new ArrayList<>();
+ for (final Tablet tablet : tabletEntry.getValue().getRight()) {
+ boolean success = false;
+ for (final Tablet batchTablet : batchTablets) {
+ if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) {
+ success = true;
+ break;
+ }
+ }
+ if (!success) {
+ batchTablets.add(tablet);
+ }
+ }
+ for (final Tablet batchTablet : batchTablets) {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ batchTablet.serialize(outputStream);
+ ReadWriteIOUtils.write(true, outputStream);
+ tabletBuffers.add(
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
+ }
+ tabletDataBases.add(databaseName);
+ }
+ }
+ }
+
+ tableModelTabletMap.clear();
+
return PipeTransferTabletBatchReqV2.toTPipeTransferReq(
binaryBuffers,
insertNodeBuffers,
@@ -111,57 +147,71 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
return pipe2BytesAccumulated;
}
- private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
- throws IOException, WALPipeException {
- int databaseEstimateSize = 0;
+ private long buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException {
+ long estimateSize = 0;
final ByteBuffer buffer;
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) event;
- // Read the bytebuffer from the wal file and transfer it directly
without serializing or
- // deserializing if possible
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNode();
- if (Objects.isNull(insertNode)) {
- buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
- binaryBuffers.add(buffer);
- if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
- databaseEstimateSize =
-
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
-
binaryDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
- } else {
- databaseEstimateSize = 4;
- binaryDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
- }
- } else {
+ if (!(insertNode instanceof RelationalInsertTabletNode)) {
buffer = insertNode.serializeToByteBuffer();
insertNodeBuffers.add(buffer);
if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
- databaseEstimateSize =
-
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
+ estimateSize =
+ RamUsageEstimator.sizeOf(
+
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
} else {
- databaseEstimateSize = 4;
+ estimateSize = 4;
insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
}
+ estimateSize += buffer.limit();
+ } else {
+ for (final Tablet tablet :
+ ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) {
+ estimateSize +=
+ constructTabletBatch(
+ tablet,
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+ }
}
} else {
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) event;
- try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
- final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
- ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(),
outputStream);
- buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
- }
- tabletBuffers.add(buffer);
if (pipeRawTabletInsertionEvent.isTableModelEvent()) {
- databaseEstimateSize =
pipeRawTabletInsertionEvent.getTableModelDatabaseName().length();
-
tabletDataBases.add(pipeRawTabletInsertionEvent.getTableModelDatabaseName());
+ estimateSize =
+ constructTabletBatch(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.getTableModelDatabaseName());
} else {
- databaseEstimateSize = 4;
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
+ ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(),
outputStream);
+ buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ estimateSize = 4 + buffer.limit();
+ tabletBuffers.add(buffer);
tabletDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
}
}
- return buffer.limit() + databaseEstimateSize;
+
+ return estimateSize;
+ }
+
+ private long constructTabletBatch(final Tablet tablet, final String
databaseName) {
+ final AtomicLong size = new AtomicLong(0);
+ final Pair<Integer, List<Tablet>> currentBatch =
+ tableModelTabletMap
+ .computeIfAbsent(
+ databaseName,
+ k -> {
+ size.addAndGet(RamUsageEstimator.sizeOf(databaseName));
+ return new HashMap<>();
+ })
+ .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new
ArrayList<>()));
+ currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
+ currentBatch.getRight().add(tablet);
+ return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index b7d7d44db85..d9c3fabfae8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -39,7 +39,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq {
@@ -62,6 +64,8 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new
ArrayList<>();
+ final Map<String, List<InsertRowStatement>>
tableModelDatabaseInsertRowStatementMap =
+ new HashMap<>();
for (final PipeTransferTabletBinaryReqV2 binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
@@ -69,7 +73,22 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
continue;
}
if (statement.isWriteToTable()) {
- statements.add(statement);
+ if (statement instanceof InsertRowStatement) {
+ tableModelDatabaseInsertRowStatementMap
+ .computeIfAbsent(statement.getDatabaseName().get(), k -> new
ArrayList<>())
+ .add((InsertRowStatement) statement);
+ } else if (statement instanceof InsertTabletStatement) {
+ statements.add(statement);
+ } else if (statement instanceof InsertRowsStatement) {
+ tableModelDatabaseInsertRowStatementMap
+ .computeIfAbsent(statement.getDatabaseName().get(), k -> new
ArrayList<>())
+ .addAll(((InsertRowsStatement)
statement).getInsertRowStatementList());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "unknown InsertBaseStatement %s constructed from
PipeTransferTabletBinaryReqV2.",
+ binaryReq));
+ }
continue;
}
if (statement instanceof InsertRowStatement) {
@@ -93,7 +112,22 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
continue;
}
if (statement.isWriteToTable()) {
- statements.add(statement);
+ if (statement instanceof InsertRowStatement) {
+ tableModelDatabaseInsertRowStatementMap
+ .computeIfAbsent(statement.getDatabaseName().get(), k -> new
ArrayList<>())
+ .add((InsertRowStatement) statement);
+ } else if (statement instanceof InsertTabletStatement) {
+ statements.add(statement);
+ } else if (statement instanceof InsertRowsStatement) {
+ tableModelDatabaseInsertRowStatementMap
+ .computeIfAbsent(statement.getDatabaseName().get(), k -> new
ArrayList<>())
+ .addAll(((InsertRowsStatement)
statement).getInsertRowStatementList());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "unknown InsertBaseStatement %s constructed from
PipeTransferTabletBinaryReqV2.",
+ insertNodeReq));
+ }
continue;
}
if (statement instanceof InsertRowStatement) {
@@ -131,6 +165,16 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
if (!insertMultiTabletsStatement.isEmpty()) {
statements.add(insertMultiTabletsStatement);
}
+
+ for (final Map.Entry<String, List<InsertRowStatement>> insertRows :
+ tableModelDatabaseInsertRowStatementMap.entrySet()) {
+ final InsertRowsStatement statement = new InsertRowsStatement();
+ statement.setWriteToTable(true);
+ statement.setDatabaseName(insertRows.getKey());
+ statement.setInsertRowStatementList(insertRows.getValue());
+ statements.add(statement);
+ }
+
return statements;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
index 20847173ffc..c9857c9eaba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
@@ -153,7 +153,7 @@ public class PipeTabletEventSorter {
private int getLastNonnullIndex(
final int i, final BitMap originalBitMap, final BitMap
deDuplicatedBitMap) {
if (deDuplicatedIndex == null) {
- if (originalBitMap.isMarked(index[i])) {
+ if (originalBitMap != null && originalBitMap.isMarked(index[i])) {
deDuplicatedBitMap.mark(i);
}
return index[i];