This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 226fc6fd684 Pipe: Modify Sink Batch Metrics (#16018)
226fc6fd684 is described below
commit 226fc6fd684834dd1797fcd83fa732da1472fb79
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Jul 29 15:51:57 2025 +0800
Pipe: Modify Sink Batch Metrics (#16018)
* Pipe: Modify Sink Batch Metrics
* update
* update
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 27 +++++
.../metric/sink/PipeDataRegionSinkMetrics.java | 111 +++++++++++++++------
.../evolvable/batch/PipeTabletEventBatch.java | 19 +++-
.../evolvable/batch/PipeTabletEventPlainBatch.java | 17 ++--
.../batch/PipeTabletEventTsFileBatch.java | 21 ++--
.../batch/PipeTransferBatchReqBuilder.java | 51 +++++++++-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 29 ++++++
.../thrift/sync/IoTDBDataRegionSyncSink.java | 29 ++++++
.../commons/pipe/sink/protocol/IoTDBSink.java | 17 ++++
9 files changed, 265 insertions(+), 56 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 4b98ae82d81..ffd484b1306 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncS
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -337,6 +338,32 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
: 0;
}
+ public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+ if (outputPipeConnector instanceof IoTDBSink) {
+ ((IoTDBSink)
outputPipeConnector).setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+ }
+ }
+
+ public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+ if (outputPipeConnector instanceof IoTDBSink) {
+ ((IoTDBSink)
outputPipeConnector).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+ }
+ }
+
+ public void setTabletBatchTimeIntervalHistogram(Histogram
tabletBatchTimeIntervalHistogram) {
+ if (outputPipeConnector instanceof IoTDBSink) {
+ ((IoTDBSink) outputPipeConnector)
+
.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+ }
+ }
+
+ public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
+ if (outputPipeConnector instanceof IoTDBSink) {
+ ((IoTDBSink) outputPipeConnector)
+
.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+ }
+ }
+
//////////////////////////// Error report ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index 52a8828b2ee..e1ddc368a5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
import org.apache.iotdb.metrics.AbstractMetricService;
-import org.apache.iotdb.metrics.impl.DoNothingHistogram;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.metrics.type.Rate;
@@ -45,14 +44,6 @@ public class PipeDataRegionSinkMetrics implements IMetricSet
{
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataRegionSinkMetrics.class);
- public static Histogram tabletBatchSizeHistogram = new DoNothingHistogram();
-
- public static Histogram tsFileBatchSizeHistogram = new DoNothingHistogram();
-
- public static Histogram tabletBatchTimeIntervalHistogram = new
DoNothingHistogram();
-
- public static Histogram tsFileBatchTimeIntervalHistogram = new
DoNothingHistogram();
-
@SuppressWarnings("java:S3077")
private volatile AbstractMetricService metricService;
@@ -75,28 +66,13 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
for (String taskID : taskIDs) {
createMetrics(taskID);
}
-
- tabletBatchSizeHistogram =
- metricService.getOrCreateHistogram(
- Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT);
-
- tsFileBatchSizeHistogram =
- metricService.getOrCreateHistogram(
- Metric.PIPE_TSFILE_BATCH_SIZE.toString(), MetricLevel.IMPORTANT);
-
- tabletBatchTimeIntervalHistogram =
- metricService.getOrCreateHistogram(
- Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
MetricLevel.IMPORTANT);
-
- tsFileBatchTimeIntervalHistogram =
- metricService.getOrCreateHistogram(
- Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
MetricLevel.IMPORTANT);
}
private void createMetrics(final String taskID) {
createAutoGauge(taskID);
createRate(taskID);
createTimer(taskID);
+ createHistogram(taskID);
}
private void createAutoGauge(final String taskID) {
@@ -245,6 +221,50 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
String.valueOf(connector.getCreationTime())));
}
+ private void createHistogram(final String taskID) {
+ final PipeSinkSubtask connector = connectorMap.get(taskID);
+
+ final Histogram tabletBatchSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ connector.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+
+ final Histogram tsFileBatchSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ connector.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+
+ final Histogram tabletBatchTimeIntervalHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+
connector.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+
+ final Histogram tsFileBatchTimeIntervalHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+ }
+
@Override
public void unbindFrom(final AbstractMetricService metricService) {
final ImmutableSet<String> taskIDs =
ImmutableSet.copyOf(connectorMap.keySet());
@@ -255,20 +275,13 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
LOGGER.warn(
"Failed to unbind from pipe data region connector metrics, connector
map not empty");
}
-
- metricService.remove(MetricType.HISTOGRAM,
Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString());
-
- metricService.remove(MetricType.HISTOGRAM,
Metric.PIPE_TSFILE_BATCH_SIZE.toString());
-
- metricService.remove(MetricType.HISTOGRAM,
Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString());
-
- metricService.remove(MetricType.HISTOGRAM,
Metric.PIPE_TSFILE_BATCH_TIME_COST.toString());
}
private void removeMetrics(final String taskID) {
removeAutoGauge(taskID);
removeRate(taskID);
removeTimer(taskID);
+ removeHistogram(taskID);
}
private void removeAutoGauge(final String taskID) {
@@ -397,6 +410,38 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
compressionTimerMap.remove(connector.getAttributeSortedString());
}
+ private void removeHistogram(final String taskID) {
+ final PipeSinkSubtask connector = connectorMap.get(taskID);
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_INSERT_NODE_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_TSFILE_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_INSERT_NODE_BATCH_TIME_COST.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_TSFILE_BATCH_TIME_COST.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString(),
+ Tag.CREATION_TIME.toString(),
+ String.valueOf(connector.getCreationTime()));
+ }
+
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
public void register(@NonNull final PipeSinkSubtask pipeSinkSubtask) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index c7622c4a95f..58316f4c816 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.function.BiConsumer;
public abstract class PipeTabletEventBatch implements AutoCloseable {
@@ -43,6 +44,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
protected final List<EnrichedEvent> events = new ArrayList<>();
+ protected final BiConsumer<Long, Long> recordMetric;
private final int maxDelayInMs;
private long firstEventProcessingTime = Long.MIN_VALUE;
@@ -52,12 +54,23 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
protected volatile boolean isClosed = false;
- protected PipeTabletEventBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
+ protected PipeTabletEventBatch(
+ final int maxDelayInMs,
+ final long requestMaxBatchSizeInBytes,
+ final BiConsumer<Long, Long> recordMetric) {
if (pipeModelFixedMemoryBlock == null) {
init();
}
this.maxDelayInMs = maxDelayInMs;
+ if (recordMetric != null) {
+ this.recordMetric = recordMetric;
+ } else {
+ this.recordMetric =
+ (timeInterval, bufferSize) -> {
+ // do nothing
+ };
+ }
// limit in buffer size
this.allocatedMemoryBlock =
@@ -129,14 +142,12 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double)
diff / maxDelayInMs);
- recordMetric(diff, totalBufferSize);
+ recordMetric.accept(diff, totalBufferSize);
return true;
}
return false;
}
- protected abstract void recordMetric(final long timeInterval, final long
bufferSize);
-
private long getMaxBatchSizeInBytes() {
return allocatedMemoryBlock.getMemoryUsageInBytes();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index c637b2bff42..9e3155bb9e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReqV2;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -40,6 +39,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.BiConsumer;
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
@@ -56,7 +56,14 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new
HashMap<>();
PipeTabletEventPlainBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
- super(maxDelayInMs, requestMaxBatchSizeInBytes);
+ super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
+ }
+
+ PipeTabletEventPlainBatch(
+ final int maxDelayInMs,
+ final long requestMaxBatchSizeInBytes,
+ final BiConsumer<Long, Long> recordMetric) {
+ super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
}
@Override
@@ -72,12 +79,6 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
return true;
}
- @Override
- protected void recordMetric(long timeInterval, long bufferSize) {
-
PipeDataRegionSinkMetrics.tabletBatchTimeIntervalHistogram.update(timeInterval);
- PipeDataRegionSinkMetrics.tabletBatchSizeHistogram.update(bufferSize);
- }
-
@Override
public synchronized void onSuccess() {
super.onSuccess();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 660706a4325..cb8c25ef755 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import
org.apache.iotdb.db.pipe.sink.util.builder.PipeTableModelTsFileBuilderV2;
import org.apache.iotdb.db.pipe.sink.util.builder.PipeTreeModelTsFileBuilderV2;
@@ -45,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
@@ -59,7 +59,18 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
private final Map<Pair<String, Long>, Double> pipeName2WeightMap = new
HashMap<>();
public PipeTabletEventTsFileBatch(final int maxDelayInMs, final long
requestMaxBatchSizeInBytes) {
- super(maxDelayInMs, requestMaxBatchSizeInBytes);
+ super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
+
+ final AtomicLong tsFileIdGenerator = new AtomicLong(0);
+ treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId,
tsFileIdGenerator);
+ tableModeTsFileBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId,
tsFileIdGenerator);
+ }
+
+ public PipeTabletEventTsFileBatch(
+ final int maxDelayInMs,
+ final long requestMaxBatchSizeInBytes,
+ final BiConsumer<Long, Long> recordMetric) {
+ super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
final AtomicLong tsFileIdGenerator = new AtomicLong(0);
treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId,
tsFileIdGenerator);
@@ -126,12 +137,6 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
return true;
}
- @Override
- protected void recordMetric(long timeInterval, long bufferSize) {
-
PipeDataRegionSinkMetrics.tsFileBatchTimeIntervalHistogram.update(timeInterval);
- PipeDataRegionSinkMetrics.tsFileBatchSizeHistogram.update(bufferSize);
- }
-
private void bufferTreeModelTablet(
final String pipeName,
final long creationTime,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index d30acaf146a..9c4b74a6800 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -25,6 +25,8 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeCacheLeaderClientManager;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.metrics.impl.DoNothingHistogram;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -68,6 +70,11 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
private final int requestMaxDelayInMs;
private final long requestMaxBatchSizeInBytes;
+ private Histogram tabletBatchSizeHistogram = new DoNothingHistogram();
+ private Histogram tsFileBatchSizeHistogram = new DoNothingHistogram();
+ private Histogram tabletBatchTimeIntervalHistogram = new
DoNothingHistogram();
+ private Histogram tsFileBatchTimeIntervalHistogram = new
DoNothingHistogram();
+
// If the leader cache is disabled (or unable to find the endpoint of event
in the leader cache),
// the event will be stored in the default batch.
private final PipeTabletEventBatch defaultBatch;
@@ -113,8 +120,10 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
: CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
this.defaultBatch =
usingTsFileBatch
- ? new PipeTabletEventTsFileBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes)
- : new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes);
+ ? new PipeTabletEventTsFileBatch(
+ requestMaxDelayInMs, requestMaxBatchSizeInBytes,
this::recordTsFileMetric)
+ : new PipeTabletEventPlainBatch(
+ requestMaxDelayInMs, requestMaxBatchSizeInBytes,
this::recordTabletMetric);
}
/**
@@ -157,7 +166,9 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
endPointToBatch
.computeIfAbsent(
endPoint,
- k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs,
requestMaxBatchSizeInBytes))
+ k ->
+ new PipeTabletEventPlainBatch(
+ requestMaxDelayInMs, requestMaxBatchSizeInBytes,
this::recordTabletMetric))
.onEvent(event);
}
@@ -208,4 +219,38 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
defaultBatch.close();
endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
}
+
+ public void recordTabletMetric(long timeInterval, long bufferSize) {
+ this.tabletBatchTimeIntervalHistogram.update(timeInterval);
+ this.tabletBatchSizeHistogram.update(bufferSize);
+ }
+
+ public void recordTsFileMetric(long timeInterval, long bufferSize) {
+ this.tsFileBatchTimeIntervalHistogram.update(timeInterval);
+ this.tsFileBatchSizeHistogram.update(bufferSize);
+ }
+
+ public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+ if (tabletBatchSizeHistogram != null) {
+ this.tabletBatchSizeHistogram = tabletBatchSizeHistogram;
+ }
+ }
+
+ public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+ if (tsFileBatchSizeHistogram != null) {
+ this.tsFileBatchSizeHistogram = tsFileBatchSizeHistogram;
+ }
+ }
+
+ public void setTabletBatchTimeIntervalHistogram(Histogram
tabletBatchTimeIntervalHistogram) {
+ if (tabletBatchTimeIntervalHistogram != null) {
+ this.tabletBatchTimeIntervalHistogram = tabletBatchTimeIntervalHistogram;
+ }
+ }
+
+ public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
+ if (tsFileBatchTimeIntervalHistogram != null) {
+ this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 9fd63a054be..38ac7e0201a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferT
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler.PipeTransferTsFileHandler;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
@@ -832,4 +833,32 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
this.transferTsFileCounter = transferTsFileCounter;
}
+
+ @Override
+ public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setTabletBatchTimeIntervalHistogram(Histogram
tabletBatchTimeIntervalHistogram) {
+ if (tabletBatchBuilder != null) {
+
tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+ }
+ }
+
+ @Override
+ public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
+ if (tabletBatchBuilder != null) {
+
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 6b8327170c4..c9185b44d44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFil
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.sink.util.cacher.LeaderCacheUtils;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -626,4 +627,32 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
public IoTDBDataNodeSyncClientManager getClientManager() {
return clientManager;
}
+
+ @Override
+ public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setTabletBatchSizeHistogram(tabletBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
+ }
+ }
+
+ @Override
+ public void setTabletBatchTimeIntervalHistogram(Histogram
tabletBatchTimeIntervalHistogram) {
+ if (tabletBatchBuilder != null) {
+
tabletBatchBuilder.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
+ }
+ }
+
+ @Override
+ public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
+ if (tabletBatchBuilder != null) {
+
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 471dc28bc06..38f3503b66f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.pipe.sink.limiter.GlobalRPCRateLimiter;
import org.apache.iotdb.commons.pipe.sink.limiter.PipeEndPointRateLimiter;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -631,4 +632,20 @@ public abstract class IoTDBSink implements PipeConnector {
public PipeReceiverStatusHandler statusHandler() {
return receiverStatusHandler;
}
+
+ public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
+ // do nothing by default
+ }
+
+ public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
+ // do nothing by default
+ }
+
+ public void setTabletBatchTimeIntervalHistogram(Histogram
tabletBatchTimeIntervalHistogram) {
+ // do nothing by default
+ }
+
+ public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
+ // do nothing by default
+ }
}