This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 96100eb83e [flink] support flink sourceIdleTime metric in ReadOperator
(#4644)
96100eb83e is described below
commit 96100eb83e8a6a2faeeef4d6d092d7dbf885d02f
Author: herefree <[email protected]>
AuthorDate: Sun Dec 8 22:24:52 2024 +0800
[flink] support flink sourceIdleTime metric in ReadOperator (#4644)
---
.../metrics/FileStoreSourceReaderMetrics.java | 1 +
.../paimon/flink/source/operator/ReadOperator.java | 24 +++++++++++++++++++++-
.../flink/source/operator/OperatorSourceTest.java | 16 +++++++++++++++
3 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
index 2e1e947779..a270e0ecee 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
@@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics {
private long lastSplitUpdateTime = UNDEFINED;
public static final long UNDEFINED = -1L;
+ public static final long ACTIVE = Long.MAX_VALUE;
public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
sourceReaderMetricGroup.gauge(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index 80c85f7cdb..d884724c67 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -54,9 +54,11 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
private transient IOManager ioManager;
private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
- // we create our own gauge for currentEmitEventTimeLag, because this
operator is not a FLIP-27
+ // we create our own gauge for currentEmitEventTimeLag and sourceIdleTime,
because this operator
+ // is not a FLIP-27
// source and Flink can't automatically calculate this metric
private transient long emitEventTimeLag =
FileStoreSourceReaderMetrics.UNDEFINED;
+ private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
private transient Counter numRecordsIn;
public ReadOperator(ReadBuilder readBuilder) {
@@ -69,6 +71,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
this.sourceReaderMetrics = new
FileStoreSourceReaderMetrics(getMetricGroup());
getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () ->
emitEventTimeLag);
+ getMetricGroup().gauge(MetricNames.SOURCE_IDLE_TIME,
this::getIdleTime);
this.numRecordsIn =
InternalSourceReaderMetricGroup.wrap(getMetricGroup())
.getIOMetricGroup()
@@ -83,6 +86,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
this.read = readBuilder.newRead().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(reuseRow);
+ this.idlingStarted();
}
@Override
@@ -94,6 +98,8 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
.earliestFileCreationEpochMillis()
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);
+ // update idleStartTime when reading a new split
+ idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
boolean firstRecord = true;
try (CloseableIterator<InternalRow> iterator =
@@ -113,6 +119,8 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
output.collect(reuseRecord);
}
}
+ // start idle when data sending is completed
+ this.idlingStarted();
}
@Override
@@ -122,4 +130,18 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
ioManager.close();
}
}
+
+ private void idlingStarted() {
+ if (!isIdling()) {
+ idleStartTime = System.currentTimeMillis();
+ }
+ }
+
+ private boolean isIdling() {
+ return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE;
+ }
+
+ private long getIdleTime() {
+ return isIdling() ? System.currentTimeMillis() - idleStartTime : 0;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index 61a03a29a2..0bce8c8901 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -204,6 +204,14 @@ public class OperatorSourceTest {
.getValue())
.isEqualTo(-1L);
+ Thread.sleep(300L);
+ assertThat(
+ (Long)
+ TestingMetricUtils.getGauge(
+ readerOperatorMetricGroup,
"sourceIdleTime")
+ .getValue())
+ .isGreaterThan(299L);
+
harness.processElement(new StreamRecord<>(splits.get(0)));
assertThat(
(Long)
@@ -228,6 +236,14 @@ public class OperatorSourceTest {
"currentEmitEventTimeLag")
.getValue())
.isEqualTo(emitEventTimeLag);
+
+ assertThat(
+ (Long)
+ TestingMetricUtils.getGauge(
+ readerOperatorMetricGroup,
"sourceIdleTime")
+ .getValue())
+ .isGreaterThan(99L)
+ .isLessThan(300L);
}
private <T> T testReadSplit(