This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6532d297bca Optimize pipe event batching and listener stop (#17864)
6532d297bca is described below
commit 6532d297bca61aee8272476e04c9fcecec2e787b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 9 12:13:21 2026 +0800
Optimize pipe event batching and listener stop (#17864)
---
.../evolvable/batch/PipeTabletEventPlainBatch.java | 22 ++++++-----
.../batch/PipeTransferBatchReqBuilder.java | 43 +++++++++++++---------
.../listener/PipeInsertionDataNodeListener.java | 2 +-
3 files changed, 39 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 34837424b98..b32479e2f1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -47,7 +47,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Objects;
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
@@ -105,14 +105,18 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
insertTablets.getValue().entrySet()) {
// needCopyFlag and tablet
final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>();
+ final int totalRowSize = tabletEntry.getValue().getLeft();
for (final Tablet tablet : tabletEntry.getValue().getRight()) {
boolean success = false;
for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
+ if (!canAppendTablet(tabletPair.getRight(), tablet)) {
+ continue;
+ }
if (tabletPair.getLeft()) {
tabletPair.setRight(copyTablet(tabletPair.getRight()));
tabletPair.setLeft(Boolean.FALSE);
}
- if (tabletPair.getRight().append(tablet,
tabletEntry.getValue().getLeft())) {
+ if (tabletPair.getRight().append(tablet, totalRowSize)) {
success = true;
break;
}
@@ -203,21 +207,21 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
}
private long constructTabletBatch(final Tablet tablet, final String
databaseName) {
- final AtomicLong size = new AtomicLong(0);
final Pair<Integer, List<Tablet>> currentBatch =
tableModelTabletMap
- .computeIfAbsent(
- databaseName,
- k -> {
- size.addAndGet(RamUsageEstimator.sizeOf(databaseName));
- return new HashMap<>();
- })
+ .computeIfAbsent(databaseName, k -> new HashMap<>())
.computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new
ArrayList<>()));
currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize());
currentBatch.getRight().add(tablet);
return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
}
+ private static boolean canAppendTablet(final Tablet target, final Tablet
source) {
+ return Objects.equals(target.getDeviceId(), source.getDeviceId())
+ && Objects.equals(target.getSchemas(), source.getSchemas())
+ && Objects.equals(target.getColumnTypes(), source.getColumnTypes());
+ }
+
public static Tablet copyTablet(final Tablet tablet) {
final Object[] copiedValues = new Object[tablet.getValues().length];
for (int i = 0; i < tablet.getValues().length; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index b3a8884a146..ce70cf6f6e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -40,10 +40,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
@@ -85,8 +85,7 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
// If the leader cache is enabled, the batch will be divided by the leader
endpoint,
// each endpoint has a batch.
// This is only used in plain batch since tsfile does not return redirection
info.
- private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
- new ConcurrentHashMap<>();
+ private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
new HashMap<>();
public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
final boolean usingTsFileBatch =
@@ -182,22 +181,29 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
getAllNonEmptyAndShouldEmitBatches() {
final List<Pair<TEndPoint, PipeTabletEventBatch>>
nonEmptyAndShouldEmitBatches =
- new ArrayList<>();
+ new ArrayList<>(endPointToBatch.size() + 1);
if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
}
- endPointToBatch.forEach(
- (endPoint, batch) -> {
- if (!batch.isEmpty() && batch.shouldEmit()) {
- nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
- }
- });
+ for (final Map.Entry<TEndPoint, PipeTabletEventPlainBatch> entry :
endPointToBatch.entrySet()) {
+ final PipeTabletEventPlainBatch batch = entry.getValue();
+ if (!batch.isEmpty() && batch.shouldEmit()) {
+ nonEmptyAndShouldEmitBatches.add(new Pair<>(entry.getKey(), batch));
+ }
+ }
return nonEmptyAndShouldEmitBatches;
}
- public boolean isEmpty() {
- return defaultBatch.isEmpty()
- &&
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
+ public synchronized boolean isEmpty() {
+ if (!defaultBatch.isEmpty()) {
+ return false;
+ }
+ for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+ if (!batch.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
}
public synchronized void discardEventsOfPipe(
@@ -210,12 +216,13 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(committerKey));
}
- public int size() {
+ public synchronized int size() {
try {
- return defaultBatch.events.size()
- + endPointToBatch.values().stream()
- .map(batch -> batch.events.size())
- .reduce(0, Integer::sum);
+ int size = defaultBatch.events.size();
+ for (final PipeTabletEventPlainBatch batch : endPointToBatch.values()) {
+ size += batch.events.size();
+ }
+ return size;
} catch (final Exception e) {
LOGGER.warn(
DataNodePipeMessages.FAILED_TO_GET_THE_SIZE_OF_PIPETRANSFERBATCHREQBUILDER,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 76c001b2696..7e3c25e8062 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -66,7 +66,7 @@ public class PipeInsertionDataNodeListener {
});
}
- public synchronized void stopListenAndAssign(
+ public void stopListenAndAssign(
final int dataRegionId, final PipeRealtimeDataRegionSource source) {
PipeDataRegionAssigner assignerToClose = null;