This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 96100eb83e [flink] support flink sourceIdleTime metric in ReadOperator 
(#4644)
96100eb83e is described below

commit 96100eb83e8a6a2faeeef4d6d092d7dbf885d02f
Author: herefree <[email protected]>
AuthorDate: Sun Dec 8 22:24:52 2024 +0800

    [flink] support flink sourceIdleTime metric in ReadOperator (#4644)
---
 .../metrics/FileStoreSourceReaderMetrics.java      |  1 +
 .../paimon/flink/source/operator/ReadOperator.java | 24 +++++++++++++++++++++-
 .../flink/source/operator/OperatorSourceTest.java  | 16 +++++++++++++++
 3 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
index 2e1e947779..a270e0ecee 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
@@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics {
     private long lastSplitUpdateTime = UNDEFINED;
 
     public static final long UNDEFINED = -1L;
+    public static final long ACTIVE = Long.MAX_VALUE;
 
     public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
         sourceReaderMetricGroup.gauge(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index 80c85f7cdb..d884724c67 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -54,9 +54,11 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     private transient IOManager ioManager;
 
     private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
-    // we create our own gauge for currentEmitEventTimeLag, because this 
operator is not a FLIP-27
+    // we create our own gauge for currentEmitEventTimeLag and sourceIdleTime, 
because this operator
+    // is not a FLIP-27
     // source and Flink can't automatically calculate this metric
     private transient long emitEventTimeLag = 
FileStoreSourceReaderMetrics.UNDEFINED;
+    private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
     private transient Counter numRecordsIn;
 
     public ReadOperator(ReadBuilder readBuilder) {
@@ -69,6 +71,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
 
         this.sourceReaderMetrics = new 
FileStoreSourceReaderMetrics(getMetricGroup());
         getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> 
emitEventTimeLag);
+        getMetricGroup().gauge(MetricNames.SOURCE_IDLE_TIME, 
this::getIdleTime);
         this.numRecordsIn =
                 InternalSourceReaderMetricGroup.wrap(getMetricGroup())
                         .getIOMetricGroup()
@@ -83,6 +86,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
         this.read = readBuilder.newRead().withIOManager(ioManager);
         this.reuseRow = new FlinkRowData(null);
         this.reuseRecord = new StreamRecord<>(reuseRow);
+        this.idlingStarted();
     }
 
     @Override
@@ -94,6 +98,8 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                         .earliestFileCreationEpochMillis()
                         .orElse(FileStoreSourceReaderMetrics.UNDEFINED);
         sourceReaderMetrics.recordSnapshotUpdate(eventTime);
+        // update idleStartTime when reading a new split
+        idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
 
         boolean firstRecord = true;
         try (CloseableIterator<InternalRow> iterator =
@@ -113,6 +119,8 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                 output.collect(reuseRecord);
             }
         }
+        // start idle when data sending is completed
+        this.idlingStarted();
     }
 
     @Override
@@ -122,4 +130,18 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
             ioManager.close();
         }
     }
+
+    private void idlingStarted() {
+        if (!isIdling()) {
+            idleStartTime = System.currentTimeMillis();
+        }
+    }
+
+    private boolean isIdling() {
+        return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE;
+    }
+
+    private long getIdleTime() {
+        return isIdling() ? System.currentTimeMillis() - idleStartTime : 0;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index 61a03a29a2..0bce8c8901 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -204,6 +204,14 @@ public class OperatorSourceTest {
                                 .getValue())
                 .isEqualTo(-1L);
 
+        Thread.sleep(300L);
+        assertThat(
+                        (Long)
+                                TestingMetricUtils.getGauge(
+                                                readerOperatorMetricGroup, 
"sourceIdleTime")
+                                        .getValue())
+                .isGreaterThan(299L);
+
         harness.processElement(new StreamRecord<>(splits.get(0)));
         assertThat(
                         (Long)
@@ -228,6 +236,14 @@ public class OperatorSourceTest {
                                                 "currentEmitEventTimeLag")
                                         .getValue())
                 .isEqualTo(emitEventTimeLag);
+
+        assertThat(
+                        (Long)
+                                TestingMetricUtils.getGauge(
+                                                readerOperatorMetricGroup, 
"sourceIdleTime")
+                                        .getValue())
+                .isGreaterThan(99L)
+                .isLessThan(300L);
     }
 
     private <T> T testReadSplit(

Reply via email to