This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f4813087f1e Pipe: Modify Sink batch event length related metrics
(#16066)
f4813087f1e is described below
commit f4813087f1ea949f1a0c8710f5b3f6d437534a4b
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jul 31 18:17:55 2025 +0800
Pipe: Modify Sink batch event length related metrics (#16066)
---
.../agent/task/subtask/sink/PipeSinkSubtask.java | 6 ++++
.../metric/sink/PipeDataRegionSinkMetrics.java | 34 +++++++++-------------
.../evolvable/batch/PipeTabletEventBatch.java | 14 +++++----
.../evolvable/batch/PipeTabletEventPlainBatch.java | 3 +-
.../batch/PipeTabletEventTsFileBatch.java | 3 +-
.../batch/PipeTransferBatchReqBuilder.java | 14 +++++++--
.../thrift/async/IoTDBDataRegionAsyncSink.java | 7 +++++
.../thrift/sync/IoTDBDataRegionSyncSink.java | 7 +++++
.../commons/pipe/sink/protocol/IoTDBSink.java | 4 +++
9 files changed, 61 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index ffd484b1306..9d01abf24c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -364,6 +364,12 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
}
+ public void setEventSizeHistogram(Histogram eventSizeHistogram) {
+ if (outputPipeConnector instanceof IoTDBSink) {
+ ((IoTDBSink)
outputPipeConnector).setBatchEventSizeHistogram(eventSizeHistogram);
+ }
+ }
+
//////////////////////////// Error report ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
index e1ddc368a5f..23024424b92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java
@@ -135,17 +135,6 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
// Metrics related to IoTDB connector
- metricService.createAutoGauge(
- Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
- MetricLevel.IMPORTANT,
- connector,
- PipeSinkSubtask::getBatchSize,
- Tag.NAME.toString(),
- connector.getAttributeSortedString(),
- Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
- Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
metricService.createAutoGauge(
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
MetricLevel.IMPORTANT,
@@ -263,6 +252,14 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
connector.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
+
+ Histogram eventSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString());
+ connector.setEventSizeHistogram(eventSizeHistogram);
}
@Override
@@ -334,15 +331,6 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
// Metrics related to IoTDB connector
- metricService.remove(
- MetricType.AUTO_GAUGE,
- Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
- Tag.NAME.toString(),
- connector.getAttributeSortedString(),
- Tag.INDEX.toString(),
- String.valueOf(connector.getConnectorIndex()),
- Tag.CREATION_TIME.toString(),
- String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.PIPE_TOTAL_UNCOMPRESSED_SIZE.toString(),
@@ -440,6 +428,12 @@ public class PipeDataRegionSinkMetrics implements
IMetricSet {
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
+
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
+ Tag.NAME.toString(),
+ connector.getAttributeSortedString());
}
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 58316f4c816..f66fb32cf5e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -36,7 +36,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.function.BiConsumer;
public abstract class PipeTabletEventBatch implements AutoCloseable {
@@ -44,7 +43,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
private static PipeModelFixedMemoryBlock pipeModelFixedMemoryBlock = null;
protected final List<EnrichedEvent> events = new ArrayList<>();
- protected final BiConsumer<Long, Long> recordMetric;
+ protected final TriLongConsumer recordMetric;
private final int maxDelayInMs;
private long firstEventProcessingTime = Long.MIN_VALUE;
@@ -57,7 +56,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
protected PipeTabletEventBatch(
final int maxDelayInMs,
final long requestMaxBatchSizeInBytes,
- final BiConsumer<Long, Long> recordMetric) {
+ final TriLongConsumer recordMetric) {
if (pipeModelFixedMemoryBlock == null) {
init();
}
@@ -67,7 +66,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
this.recordMetric = recordMetric;
} else {
this.recordMetric =
- (timeInterval, bufferSize) -> {
+ (timeInterval, bufferSize, events) -> {
// do nothing
};
}
@@ -142,7 +141,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
final long diff = System.currentTimeMillis() - firstEventProcessingTime;
if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double)
diff / maxDelayInMs);
- recordMetric.accept(diff, totalBufferSize);
+ recordMetric.accept(diff, totalBufferSize, events.size());
return true;
}
return false;
@@ -235,4 +234,9 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
.forceAllocateForModelFixedMemoryBlock(0,
PipeMemoryBlockType.BATCH);
}
}
+
+ @FunctionalInterface
+ public interface TriLongConsumer {
+ void accept(long l1, long l2, long l3);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 9e3155bb9e8..31b10736499 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -39,7 +39,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.function.BiConsumer;
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
@@ -62,7 +61,7 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
PipeTabletEventPlainBatch(
final int maxDelayInMs,
final long requestMaxBatchSizeInBytes,
- final BiConsumer<Long, Long> recordMetric) {
+ final TriLongConsumer recordMetric) {
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index cb8c25ef755..275bc694397 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -44,7 +44,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
@@ -69,7 +68,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
public PipeTabletEventTsFileBatch(
final int maxDelayInMs,
final long requestMaxBatchSizeInBytes,
- final BiConsumer<Long, Long> recordMetric) {
+ final TriLongConsumer recordMetric) {
super(maxDelayInMs, requestMaxBatchSizeInBytes, recordMetric);
final AtomicLong tsFileIdGenerator = new AtomicLong(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 9c4b74a6800..dd4d4fe1ce6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -75,6 +75,8 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
private Histogram tabletBatchTimeIntervalHistogram = new
DoNothingHistogram();
private Histogram tsFileBatchTimeIntervalHistogram = new
DoNothingHistogram();
+ private Histogram eventSizeHistogram = new DoNothingHistogram();
+
// If the leader cache is disabled (or unable to find the endpoint of event
in the leader cache),
// the event will be stored in the default batch.
private final PipeTabletEventBatch defaultBatch;
@@ -220,14 +222,16 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
}
- public void recordTabletMetric(long timeInterval, long bufferSize) {
+ public void recordTabletMetric(long timeInterval, long bufferSize, long
eventSize) {
this.tabletBatchTimeIntervalHistogram.update(timeInterval);
this.tabletBatchSizeHistogram.update(bufferSize);
+ this.eventSizeHistogram.update(eventSize);
}
- public void recordTsFileMetric(long timeInterval, long bufferSize) {
+ public void recordTsFileMetric(long timeInterval, long bufferSize, long
eventSize) {
this.tsFileBatchTimeIntervalHistogram.update(timeInterval);
this.tsFileBatchSizeHistogram.update(bufferSize);
+ this.eventSizeHistogram.update(eventSize);
}
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
@@ -253,4 +257,10 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
this.tsFileBatchTimeIntervalHistogram = tsFileBatchTimeIntervalHistogram;
}
}
+
+ public void setEventSizeHistogram(Histogram eventSizeHistogram) {
+ if (eventSizeHistogram != null) {
+ this.eventSizeHistogram = eventSizeHistogram;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 38ac7e0201a..21fb0c9a730 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -861,4 +861,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
}
}
+
+ @Override
+ public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index c9185b44d44..b3bded4a2bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -655,4 +655,11 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
tabletBatchBuilder.setTsFileBatchTimeIntervalHistogram(tsFileBatchTimeIntervalHistogram);
}
}
+
+ @Override
+ public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
+ if (tabletBatchBuilder != null) {
+ tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 38f3503b66f..efee7c9c311 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -648,4 +648,8 @@ public abstract class IoTDBSink implements PipeConnector {
public void setTsFileBatchTimeIntervalHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
// do nothing by default
}
+
+ public void setBatchEventSizeHistogram(Histogram
tsFileBatchTimeIntervalHistogram) {
+ // do nothing by default
+ }
}