This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new de5409afbb0 [FLINK-29244][state/changelog] Add metric lastMaterializationDuration to ChangelogMaterializationMetricGroup de5409afbb0 is described below commit de5409afbb003b108518ac5cc2e5ba215063c2b6 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Fri Sep 9 15:52:07 2022 +0800 [FLINK-29244][state/changelog] Add metric lastMaterializationDuration to ChangelogMaterializationMetricGroup --- docs/content.zh/docs/ops/metrics.md | 7 ++++++- docs/content/docs/ops/metrics.md | 7 ++++++- .../flink/state/changelog/ChangelogMetricGroupTest.java | 10 ++++++++++ .../state/common/ChangelogMaterializationMetricGroup.java | 11 ++++++++++- .../flink/state/common/PeriodicMaterializationManager.java | 9 +++++++-- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md index b35843903b6..17d26de9dca 100644 --- a/docs/content.zh/docs/ops/metrics.md +++ b/docs/content.zh/docs/ops/metrics.md @@ -1571,7 +1571,7 @@ Note that the metrics are only available via reporters. <td>Gauge</td> </tr> <tr> - <th rowspan="7"><strong>Task/Operator</strong></th> + <th rowspan="8"><strong>Task/Operator</strong></th> <td>startedMaterialization</td> <td>The number of started materializations.</td> <td>Counter</td> @@ -1586,6 +1586,11 @@ Note that the metrics are only available via reporters. <td>The number of failed materializations.</td> <td>Counter</td> </tr> + <tr> + <td>lastDurationOfMaterialization</td> + <td>The duration of the last materialization (in milliseconds).</td> + <td>Gauge</td> + </tr> <tr> <td>lastFullSizeOfMaterialization</td> <td>The full size of the materialization part of the last reported checkpoint (in bytes).</td> diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md index 94600081fbb..2748addaf8d 100644 --- a/docs/content/docs/ops/metrics.md +++ b/docs/content/docs/ops/metrics.md @@ -1556,7 +1556,7 @@ Note that the metrics are only available via reporters. <td>Gauge</td> </tr> <tr> - <th rowspan="7"><strong>Task/Operator</strong></th> + <th rowspan="8"><strong>Task/Operator</strong></th> <td>startedMaterialization</td> <td>The number of started materializations.</td> <td>Counter</td> @@ -1571,6 +1571,11 @@ Note that the metrics are only available via reporters. <td>The number of failed materializations.</td> <td>Counter</td> </tr> + <tr> + <td>lastDurationOfMaterialization</td> + <td>The duration of the last materialization (in milliseconds).</td> + <td>Gauge</td> + </tr> <tr> <td>lastFullSizeOfMaterialization</td> <td>The full size of the materialization part of the last reported checkpoint (in bytes).</td> diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java index f032ea862d1..6bfe146d701 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java @@ -56,6 +56,7 @@ import static org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup. import static org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.LATEST_INC_SIZE_OF_NON_MATERIALIZATION; import static org.apache.flink.state.common.ChangelogMaterializationMetricGroup.COMPLETED_MATERIALIZATION; import static org.apache.flink.state.common.ChangelogMaterializationMetricGroup.FAILED_MATERIALIZATION; +import static org.apache.flink.state.common.ChangelogMaterializationMetricGroup.LAST_DURATION_OF_MATERIALIZATION; import static org.apache.flink.state.common.ChangelogMaterializationMetricGroup.STARTED_MATERIALIZATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -73,6 +74,7 @@ public class ChangelogMetricGroupTest { private Counter startedMaterializationCounter; private Counter completedMaterializationCounter; private Counter failedMaterializationCounter; + private Gauge<Long> lastDurationOfMaterializationGauge; private Gauge<Long> lastFullSizeOfMaterializationGauge; private Gauge<Long> lastIncSizeOfMaterializationGauge; private Gauge<Long> lastFullSizeOfNonMaterializationGauge; @@ -83,10 +85,12 @@ public class ChangelogMetricGroupTest { setup(snapshotResult -> snapshotResult); // The materialization will be skipped if no data updated. + assertEquals(-1L, lastDurationOfMaterializationGauge.getValue().longValue()); periodicMaterializationManager.triggerMaterialization(); runSnapshot(1L); assertEquals(1L, startedMaterializationCounter.getCount()); assertEquals(1L, completedMaterializationCounter.getCount()); + assertNotEquals(-1L, lastDurationOfMaterializationGauge.getValue().longValue()); assertEquals(0L, lastFullSizeOfMaterializationGauge.getValue().longValue()); assertEquals(0L, lastIncSizeOfMaterializationGauge.getValue().longValue()); assertEquals(0L, lastFullSizeOfNonMaterializationGauge.getValue().longValue()); @@ -104,6 +108,7 @@ public class ChangelogMetricGroupTest { Long lastIncSizeOfNonMaterialization = lastIncSizeOfNonMaterializationGauge.getValue(); assertNotEquals(0L, lastFullSizeOfMaterialization.longValue()); assertNotEquals(0L, lastIncSizeOfMaterialization.longValue()); + assertNotEquals(-1L, lastDurationOfMaterializationGauge.getValue().longValue()); // The non-materialization size will be zero if no data updated between completed // materialization and checkpoint. assertEquals(0L, lastFullSizeOfNonMaterialization.longValue()); @@ -128,11 +133,13 @@ public class ChangelogMetricGroupTest { setup(snapshotResult -> ExceptionallyDoneFuture.of(new RuntimeException())); changelogKeyedStateBackend.setCurrentKey(1); state.update(1); + assertEquals(-1L, lastDurationOfMaterializationGauge.getValue().longValue()); periodicMaterializationManager.triggerMaterialization(); runSnapshot(1L); assertEquals(0L, completedMaterializationCounter.getCount()); assertEquals(1L, failedMaterializationCounter.getCount()); assertEquals(1L, startedMaterializationCounter.getCount()); + assertEquals(-1L, lastDurationOfMaterializationGauge.getValue().longValue()); assertEquals(0L, lastFullSizeOfMaterializationGauge.getValue().longValue()); assertEquals(0L, lastIncSizeOfMaterializationGauge.getValue().longValue()); assertNotEquals(0L, lastFullSizeOfNonMaterializationGauge.getValue().longValue()); @@ -175,6 +182,9 @@ public class ChangelogMetricGroupTest { Preconditions.checkNotNull(counterMap.get(COMPLETED_MATERIALIZATION)); failedMaterializationCounter = Preconditions.checkNotNull(counterMap.get(FAILED_MATERIALIZATION)); + lastDurationOfMaterializationGauge = + Preconditions.checkNotNull( + (Gauge<Long>) gaugeMap.get(LAST_DURATION_OF_MATERIALIZATION)); lastFullSizeOfMaterializationGauge = Preconditions.checkNotNull( (Gauge<Long>) gaugeMap.get(LATEST_FULL_SIZE_OF_MATERIALIZATION)); diff --git a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java index 1a2115d5c1c..d1c1b5c1d92 100644 --- a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java +++ b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java @@ -39,10 +39,16 @@ public class ChangelogMaterializationMetricGroup extends ProxyMetricGroup<Metric @VisibleForTesting public static final String FAILED_MATERIALIZATION = PREFIX + ".failedMaterialization"; + @VisibleForTesting + public static final String LAST_DURATION_OF_MATERIALIZATION = + PREFIX + ".lastDurationOfMaterialization"; + private final Counter startedMaterializationCounter; private final Counter completedMaterializationCounter; private final Counter failedMaterializationCounter; + private volatile long lastDuration = -1; + public ChangelogMaterializationMetricGroup(MetricGroup parentMetricGroup) { super(parentMetricGroup); this.startedMaterializationCounter = @@ -51,14 +57,17 @@ public class ChangelogMaterializationMetricGroup extends ProxyMetricGroup<Metric counter(COMPLETED_MATERIALIZATION, new ThreadSafeSimpleCounter()); this.failedMaterializationCounter = counter(FAILED_MATERIALIZATION, new ThreadSafeSimpleCounter()); + + gauge(LAST_DURATION_OF_MATERIALIZATION, () -> lastDuration); } void reportStartedMaterialization() { startedMaterializationCounter.inc(); } - void reportCompletedMaterialization() { + void reportCompletedMaterialization(long duration) { completedMaterializationCounter.inc(); + lastDuration = duration; } void reportFailedMaterialization() { diff --git a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java index 96926e98841..d7af80c1752 100644 --- a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java +++ b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java @@ -202,6 +202,7 @@ public class PeriodicMaterializationManager implements Closeable { public void triggerMaterialization() { mailboxExecutor.execute( () -> { + long triggerTime = System.currentTimeMillis(); metrics.reportStartedMaterialization(); Optional<MaterializationRunnable> materializationRunnableOptional; try { @@ -216,11 +217,13 @@ public class PeriodicMaterializationManager implements Closeable { asyncOperationsThreadPool.execute( () -> asyncMaterializationPhase( + triggerTime, runnable.getMaterializationRunnable(), runnable.getMaterializationID(), runnable.getMaterializedTo())); } else { - metrics.reportCompletedMaterialization(); + metrics.reportCompletedMaterialization( + System.currentTimeMillis() - triggerTime); scheduleNextMaterialization(); LOG.info( @@ -234,6 +237,7 @@ public class PeriodicMaterializationManager implements Closeable { } private void asyncMaterializationPhase( + long triggerTime, RunnableFuture<SnapshotResult<KeyedStateHandle>> materializedRunnableFuture, long materializationID, SequenceNumber upTo) { @@ -250,7 +254,8 @@ public class PeriodicMaterializationManager implements Closeable { try { target.handleMaterializationResult( snapshotResult, materializationID, upTo); - metrics.reportCompletedMaterialization(); + metrics.reportCompletedMaterialization( + System.currentTimeMillis() - triggerTime); } catch (Exception ex) { metrics.reportFailedMaterialization(); }