This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new de5409afbb0 [FLINK-29244][state/changelog] Add metric 
lastMaterializationDuration to ChangelogMaterializationMetricGroup
de5409afbb0 is described below

commit de5409afbb003b108518ac5cc2e5ba215063c2b6
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Fri Sep 9 15:52:07 2022 +0800

    [FLINK-29244][state/changelog] Add metric lastMaterializationDuration to 
ChangelogMaterializationMetricGroup
---
 docs/content.zh/docs/ops/metrics.md                           |  7 ++++++-
 docs/content/docs/ops/metrics.md                              |  7 ++++++-
 .../flink/state/changelog/ChangelogMetricGroupTest.java       | 10 ++++++++++
 .../state/common/ChangelogMaterializationMetricGroup.java     | 11 ++++++++++-
 .../flink/state/common/PeriodicMaterializationManager.java    |  9 +++++++--
 5 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index b35843903b6..17d26de9dca 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1571,7 +1571,7 @@ Note that the metrics are only available via reporters.
       <td>Gauge</td>
     </tr>
     <tr>
-      <th rowspan="7"><strong>Task/Operator</strong></th>
+      <th rowspan="8"><strong>Task/Operator</strong></th>
       <td>startedMaterialization</td>
       <td>The number of started materializations.</td>
       <td>Counter</td>
@@ -1586,6 +1586,11 @@ Note that the metrics are only available via reporters.
       <td>The number of failed materializations.</td>
       <td>Counter</td>
     </tr>
+    <tr>
+      <td>lastDurationOfMaterialization</td>
+      <td>The duration of the last materialization (in milliseconds).</td>
+      <td>Gauge</td>
+    </tr>
     <tr>
       <td>lastFullSizeOfMaterialization</td>
       <td>The full size of the materialization part of the last reported 
checkpoint (in bytes).</td>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 94600081fbb..2748addaf8d 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1556,7 +1556,7 @@ Note that the metrics are only available via reporters.
       <td>Gauge</td>
     </tr>
     <tr>
-      <th rowspan="7"><strong>Task/Operator</strong></th>
+      <th rowspan="8"><strong>Task/Operator</strong></th>
       <td>startedMaterialization</td>
       <td>The number of started materializations.</td>
       <td>Counter</td>
@@ -1571,6 +1571,11 @@ Note that the metrics are only available via reporters.
       <td>The number of failed materializations.</td>
       <td>Counter</td>
     </tr>
+    <tr>
+      <td>lastDurationOfMaterialization</td>
+      <td>The duration of the last materialization (in milliseconds).</td>
+      <td>Gauge</td>
+    </tr>
     <tr>
       <td>lastFullSizeOfMaterialization</td>
       <td>The full size of the materialization part of the last reported 
checkpoint (in bytes).</td>
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
index f032ea862d1..6bfe146d701 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
@@ -56,6 +56,7 @@ import static 
org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.
 import static 
org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.LATEST_INC_SIZE_OF_NON_MATERIALIZATION;
 import static 
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.COMPLETED_MATERIALIZATION;
 import static 
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.FAILED_MATERIALIZATION;
+import static 
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.LAST_DURATION_OF_MATERIALIZATION;
 import static 
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.STARTED_MATERIALIZATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -73,6 +74,7 @@ public class ChangelogMetricGroupTest {
     private Counter startedMaterializationCounter;
     private Counter completedMaterializationCounter;
     private Counter failedMaterializationCounter;
+    private Gauge<Long> lastDurationOfMaterializationGauge;
     private Gauge<Long> lastFullSizeOfMaterializationGauge;
     private Gauge<Long> lastIncSizeOfMaterializationGauge;
     private Gauge<Long> lastFullSizeOfNonMaterializationGauge;
@@ -83,10 +85,12 @@ public class ChangelogMetricGroupTest {
         setup(snapshotResult -> snapshotResult);
 
         // The materialization will be skipped if no data updated.
+        assertEquals(-1L, 
lastDurationOfMaterializationGauge.getValue().longValue());
         periodicMaterializationManager.triggerMaterialization();
         runSnapshot(1L);
         assertEquals(1L, startedMaterializationCounter.getCount());
         assertEquals(1L, completedMaterializationCounter.getCount());
+        assertNotEquals(-1L, 
lastDurationOfMaterializationGauge.getValue().longValue());
         assertEquals(0L, 
lastFullSizeOfMaterializationGauge.getValue().longValue());
         assertEquals(0L, 
lastIncSizeOfMaterializationGauge.getValue().longValue());
         assertEquals(0L, 
lastFullSizeOfNonMaterializationGauge.getValue().longValue());
@@ -104,6 +108,7 @@ public class ChangelogMetricGroupTest {
         Long lastIncSizeOfNonMaterialization = 
lastIncSizeOfNonMaterializationGauge.getValue();
         assertNotEquals(0L, lastFullSizeOfMaterialization.longValue());
         assertNotEquals(0L, lastIncSizeOfMaterialization.longValue());
+        assertNotEquals(-1L, 
lastDurationOfMaterializationGauge.getValue().longValue());
         // The non-materialization size will be zero if no data updated 
between completed
         // materialization and checkpoint.
         assertEquals(0L, lastFullSizeOfNonMaterialization.longValue());
@@ -128,11 +133,13 @@ public class ChangelogMetricGroupTest {
         setup(snapshotResult -> ExceptionallyDoneFuture.of(new 
RuntimeException()));
         changelogKeyedStateBackend.setCurrentKey(1);
         state.update(1);
+        assertEquals(-1L, 
lastDurationOfMaterializationGauge.getValue().longValue());
         periodicMaterializationManager.triggerMaterialization();
         runSnapshot(1L);
         assertEquals(0L, completedMaterializationCounter.getCount());
         assertEquals(1L, failedMaterializationCounter.getCount());
         assertEquals(1L, startedMaterializationCounter.getCount());
+        assertEquals(-1L, 
lastDurationOfMaterializationGauge.getValue().longValue());
         assertEquals(0L, 
lastFullSizeOfMaterializationGauge.getValue().longValue());
         assertEquals(0L, 
lastIncSizeOfMaterializationGauge.getValue().longValue());
         assertNotEquals(0L, 
lastFullSizeOfNonMaterializationGauge.getValue().longValue());
@@ -175,6 +182,9 @@ public class ChangelogMetricGroupTest {
                 
Preconditions.checkNotNull(counterMap.get(COMPLETED_MATERIALIZATION));
         failedMaterializationCounter =
                 
Preconditions.checkNotNull(counterMap.get(FAILED_MATERIALIZATION));
+        lastDurationOfMaterializationGauge =
+                Preconditions.checkNotNull(
+                        (Gauge<Long>) 
gaugeMap.get(LAST_DURATION_OF_MATERIALIZATION));
         lastFullSizeOfMaterializationGauge =
                 Preconditions.checkNotNull(
                         (Gauge<Long>) 
gaugeMap.get(LATEST_FULL_SIZE_OF_MATERIALIZATION));
diff --git 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
index 1a2115d5c1c..d1c1b5c1d92 100644
--- 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
+++ 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
@@ -39,10 +39,16 @@ public class ChangelogMaterializationMetricGroup extends 
ProxyMetricGroup<Metric
     @VisibleForTesting
     public static final String FAILED_MATERIALIZATION = PREFIX + 
".failedMaterialization";
 
+    @VisibleForTesting
+    public static final String LAST_DURATION_OF_MATERIALIZATION =
+            PREFIX + ".lastDurationOfMaterialization";
+
     private final Counter startedMaterializationCounter;
     private final Counter completedMaterializationCounter;
     private final Counter failedMaterializationCounter;
 
+    private volatile long lastDuration = -1;
+
     public ChangelogMaterializationMetricGroup(MetricGroup parentMetricGroup) {
         super(parentMetricGroup);
         this.startedMaterializationCounter =
@@ -51,14 +57,17 @@ public class ChangelogMaterializationMetricGroup extends 
ProxyMetricGroup<Metric
                 counter(COMPLETED_MATERIALIZATION, new 
ThreadSafeSimpleCounter());
         this.failedMaterializationCounter =
                 counter(FAILED_MATERIALIZATION, new ThreadSafeSimpleCounter());
+
+        gauge(LAST_DURATION_OF_MATERIALIZATION, () -> lastDuration);
     }
 
     void reportStartedMaterialization() {
         startedMaterializationCounter.inc();
     }
 
-    void reportCompletedMaterialization() {
+    void reportCompletedMaterialization(long duration) {
         completedMaterializationCounter.inc();
+        lastDuration = duration;
     }
 
     void reportFailedMaterialization() {
diff --git 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
index 96926e98841..d7af80c1752 100644
--- 
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
+++ 
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
@@ -202,6 +202,7 @@ public class PeriodicMaterializationManager implements 
Closeable {
     public void triggerMaterialization() {
         mailboxExecutor.execute(
                 () -> {
+                    long triggerTime = System.currentTimeMillis();
                     metrics.reportStartedMaterialization();
                     Optional<MaterializationRunnable> 
materializationRunnableOptional;
                     try {
@@ -216,11 +217,13 @@ public class PeriodicMaterializationManager implements 
Closeable {
                         asyncOperationsThreadPool.execute(
                                 () ->
                                         asyncMaterializationPhase(
+                                                triggerTime,
                                                 
runnable.getMaterializationRunnable(),
                                                 
runnable.getMaterializationID(),
                                                 runnable.getMaterializedTo()));
                     } else {
-                        metrics.reportCompletedMaterialization();
+                        metrics.reportCompletedMaterialization(
+                                System.currentTimeMillis() - triggerTime);
                         scheduleNextMaterialization();
 
                         LOG.info(
@@ -234,6 +237,7 @@ public class PeriodicMaterializationManager implements 
Closeable {
     }
 
     private void asyncMaterializationPhase(
+            long triggerTime,
             RunnableFuture<SnapshotResult<KeyedStateHandle>> 
materializedRunnableFuture,
             long materializationID,
             SequenceNumber upTo) {
@@ -250,7 +254,8 @@ public class PeriodicMaterializationManager implements 
Closeable {
                                             try {
                                                 
target.handleMaterializationResult(
                                                         snapshotResult, 
materializationID, upTo);
-                                                
metrics.reportCompletedMaterialization();
+                                                
metrics.reportCompletedMaterialization(
+                                                        
System.currentTimeMillis() - triggerTime);
                                             } catch (Exception ex) {
                                                 
metrics.reportFailedMaterialization();
                                             }

Reply via email to