This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 756f8cfafb208051411452f43c83191c084073f9 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Tue Aug 6 16:04:23 2024 +0200 [FLINK-35886][task] Support markStart and markEnd listeners in TimerGauge --- .../apache/flink/runtime/metrics/TimerGauge.java | 60 ++++++++++++++++++---- .../flink/runtime/metrics/TimerGaugeTest.java | 58 +++++++++++++++++++++ 2 files changed, 108 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java index 645121694cf..afab5a73a0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java @@ -24,6 +24,9 @@ import org.apache.flink.metrics.View; import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; +import java.util.ArrayList; +import java.util.Collection; + /** * {@link TimerGauge} measures how much time is spent in a given state, with entry into that state * being signaled by {@link #markStart()}. Measuring is stopped by {@link #markEnd()}. This class in @@ -37,6 +40,8 @@ public class TimerGauge implements Gauge<Long>, View { private final Clock clock; + private final Collection<StartStopListener> startStopListeners = new ArrayList<>(); + /** The time-span over which the average is calculated. */ private final int timeSpanInSeconds; /** Circular array containing the history of values. */ @@ -82,21 +87,43 @@ public class TimerGauge implements Gauge<Long>, View { this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS]; } + public synchronized void registerListener(StartStopListener listener) { + if (currentMeasurementStartTS != 0) { + listener.markStart(); + } + startStopListeners.add(listener); + } + + public synchronized void unregisterListener(StartStopListener listener) { + if (currentMeasurementStartTS != 0) { + listener.markEnd(); + } + startStopListeners.remove(listener); + } + public synchronized void markStart() { - if (currentMeasurementStartTS == 0) { - currentUpdateTS = clock.absoluteTimeMillis(); - currentMeasurementStartTS = currentUpdateTS; + if (currentMeasurementStartTS != 0) { + return; + } + currentUpdateTS = clock.absoluteTimeMillis(); + currentMeasurementStartTS = currentUpdateTS; + for (StartStopListener startStopListener : startStopListeners) { + startStopListener.markStart(); } } public synchronized void markEnd() { - if (currentMeasurementStartTS != 0) { - long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS; - currentCount += currentMeasurement; - accumulatedCount += currentMeasurement; - currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement); - currentUpdateTS = 0; - currentMeasurementStartTS = 0; + if (currentMeasurementStartTS == 0) { + return; + } + long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS; + currentCount += currentMeasurement; + accumulatedCount += currentMeasurement; + currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement); + currentUpdateTS = 0; + currentMeasurementStartTS = 0; + for (StartStopListener startStopListener : startStopListeners) { + startStopListener.markEnd(); } } @@ -164,4 +191,17 @@ public class TimerGauge implements Gauge<Long>, View { public synchronized boolean isMeasuring() { return currentMeasurementStartTS != 0; } + + /** + * Listens for {@link TimerGauge#markStart()} and {@link TimerGauge#markEnd()} events. + * + * <p>Beware! As it is right now, {@link StartStopListener} is notified under the {@link + * TimerGauge}'s lock, so those callbacks should be very short, without long call stacks that + * acquire more locks. Otherwise, a potential for deadlocks can be introduced. + */ + public interface StartStopListener { + void markStart(); + + void markEnd(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java index 7416e3292f7..ebc2d7c313a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java @@ -28,6 +28,7 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.metrics.View; import org.apache.flink.util.clock.ManualClock; +import org.apache.flink.util.clock.SystemClock; import org.junit.jupiter.api.Test; @@ -148,4 +149,61 @@ class TimerGaugeTest { assertThat(gauge.getMaxSingleMeasurement()).isEqualTo(SLEEP / 2); assertThat(gauge.getAccumulatedCount()).isEqualTo(3 * SLEEP + SLEEP / 2); } + + @Test + void testListeners() { + TimerGauge gauge = new TimerGauge(SystemClock.getInstance(), View.UPDATE_INTERVAL_SECONDS); + TestStartStopListener listener1 = new TestStartStopListener(); + TestStartStopListener listener2 = new TestStartStopListener(); + + gauge.registerListener(listener1); + + gauge.markStart(); + listener1.assertCounts(1, 0); + gauge.markEnd(); + listener1.assertCounts(1, 1); + + gauge.markStart(); + gauge.registerListener(listener2); + listener1.assertCounts(2, 1); + listener2.assertCounts(1, 0); + gauge.markEnd(); + listener1.assertCounts(2, 2); + listener2.assertCounts(1, 1); + + gauge.markStart(); + gauge.unregisterListener(listener1); + listener1.assertCounts(3, 3); + listener2.assertCounts(2, 1); + + gauge.markEnd(); + listener2.assertCounts(2, 2); + + gauge.unregisterListener(listener2); + + gauge.markStart(); + gauge.markEnd(); + listener1.assertCounts(3, 3); + listener2.assertCounts(2, 2); + } + + static class TestStartStopListener implements TimerGauge.StartStopListener { + long startCount; + long endCount; + + @Override + public void markStart() { + startCount++; + } + + @Override + public void markEnd() { + endCount++; + } + + public void assertCounts(long expectedStart, long expectedEnd) { + assertThat(startCount).isEqualTo(expectedStart); + assertThat(endCount).isEqualTo(expectedEnd); + } + } }