This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cd9476825237dac441812cdca178f689f757b14
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Tue Aug 6 16:04:23 2024 +0200

    [FLINK-35886][task] Support markStart and markEnd listeners in TimerGauge
---
 .../apache/flink/runtime/metrics/TimerGauge.java   | 60 ++++++++++++++++++----
 .../flink/runtime/metrics/TimerGaugeTest.java      | 58 +++++++++++++++++++++
 2 files changed, 108 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
index 645121694cf..afab5a73a0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
@@ -24,6 +24,9 @@ import org.apache.flink.metrics.View;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
 
+import java.util.ArrayList;
+import java.util.Collection;
+
 /**
  * {@link TimerGauge} measures how much time is spent in a given state, with 
entry into that state
  * being signaled by {@link #markStart()}. Measuring is stopped by {@link 
#markEnd()}. This class in
@@ -37,6 +40,8 @@ public class TimerGauge implements Gauge<Long>, View {
 
     private final Clock clock;
 
+    private final Collection<StartStopListener> startStopListeners = new 
ArrayList<>();
+
     /** The time-span over which the average is calculated. */
     private final int timeSpanInSeconds;
     /** Circular array containing the history of values. */
@@ -82,21 +87,43 @@ public class TimerGauge implements Gauge<Long>, View {
         this.values = new long[this.timeSpanInSeconds / 
UPDATE_INTERVAL_SECONDS];
     }
 
+    public synchronized void registerListener(StartStopListener listener) {
+        if (currentMeasurementStartTS != 0) {
+            listener.markStart();
+        }
+        startStopListeners.add(listener);
+    }
+
+    public synchronized void unregisterListener(StartStopListener listener) {
+        if (currentMeasurementStartTS != 0) {
+            listener.markEnd();
+        }
+        startStopListeners.remove(listener);
+    }
+
     public synchronized void markStart() {
-        if (currentMeasurementStartTS == 0) {
-            currentUpdateTS = clock.absoluteTimeMillis();
-            currentMeasurementStartTS = currentUpdateTS;
+        if (currentMeasurementStartTS != 0) {
+            return;
+        }
+        currentUpdateTS = clock.absoluteTimeMillis();
+        currentMeasurementStartTS = currentUpdateTS;
+        for (StartStopListener startStopListener : startStopListeners) {
+            startStopListener.markStart();
         }
     }
 
     public synchronized void markEnd() {
-        if (currentMeasurementStartTS != 0) {
-            long currentMeasurement = clock.absoluteTimeMillis() - 
currentMeasurementStartTS;
-            currentCount += currentMeasurement;
-            accumulatedCount += currentMeasurement;
-            currentMaxSingleMeasurement = 
Math.max(currentMaxSingleMeasurement, currentMeasurement);
-            currentUpdateTS = 0;
-            currentMeasurementStartTS = 0;
+        if (currentMeasurementStartTS == 0) {
+            return;
+        }
+        long currentMeasurement = clock.absoluteTimeMillis() - 
currentMeasurementStartTS;
+        currentCount += currentMeasurement;
+        accumulatedCount += currentMeasurement;
+        currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, 
currentMeasurement);
+        currentUpdateTS = 0;
+        currentMeasurementStartTS = 0;
+        for (StartStopListener startStopListener : startStopListeners) {
+            startStopListener.markEnd();
         }
     }
 
@@ -164,4 +191,17 @@ public class TimerGauge implements Gauge<Long>, View {
     public synchronized boolean isMeasuring() {
         return currentMeasurementStartTS != 0;
     }
+
+    /**
+     * Listens for {@link TimerGauge#markStart()} and {@link 
TimerGauge#markEnd()} events.
+     *
+     * <p>Beware! As it is right now, {@link StartStopListener} is notified 
under the {@link
+     * TimerGauge}'s lock, so those callbacks should be very short, without 
long call stacks that
+     * acquire more locks. Otherwise, a potential for deadlocks can be 
introduced.
+     */
+    public interface StartStopListener {
+        void markStart();
+
+        void markEnd();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
index 7416e3292f7..ebc2d7c313a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TimerGaugeTest.java
@@ -28,6 +28,7 @@ package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.metrics.View;
 import org.apache.flink.util.clock.ManualClock;
+import org.apache.flink.util.clock.SystemClock;
 
 import org.junit.jupiter.api.Test;
 
@@ -148,4 +149,61 @@ class TimerGaugeTest {
         assertThat(gauge.getMaxSingleMeasurement()).isEqualTo(SLEEP / 2);
         assertThat(gauge.getAccumulatedCount()).isEqualTo(3 * SLEEP + SLEEP / 
2);
     }
+
+    @Test
+    void testListeners() {
+        TimerGauge gauge = new TimerGauge(SystemClock.getInstance(), 
View.UPDATE_INTERVAL_SECONDS);
+        TestStartStopListener listener1 = new TestStartStopListener();
+        TestStartStopListener listener2 = new TestStartStopListener();
+
+        gauge.registerListener(listener1);
+
+        gauge.markStart();
+        listener1.assertCounts(1, 0);
+        gauge.markEnd();
+        listener1.assertCounts(1, 1);
+
+        gauge.markStart();
+        gauge.registerListener(listener2);
+        listener1.assertCounts(2, 1);
+        listener2.assertCounts(1, 0);
+        gauge.markEnd();
+        listener1.assertCounts(2, 2);
+        listener2.assertCounts(1, 1);
+
+        gauge.markStart();
+        gauge.unregisterListener(listener1);
+        listener1.assertCounts(3, 3);
+        listener2.assertCounts(2, 1);
+
+        gauge.markEnd();
+        listener2.assertCounts(2, 2);
+
+        gauge.unregisterListener(listener2);
+
+        gauge.markStart();
+        gauge.markEnd();
+        listener1.assertCounts(3, 3);
+        listener2.assertCounts(2, 2);
+    }
+
+    static class TestStartStopListener implements TimerGauge.StartStopListener 
{
+        long startCount;
+        long endCount;
+
+        @Override
+        public void markStart() {
+            startCount++;
+        }
+
+        @Override
+        public void markEnd() {
+            endCount++;
+        }
+
+        public void assertCounts(long expectedStart, long expectedEnd) {
+            assertThat(startCount).isEqualTo(expectedStart);
+            assertThat(endCount).isEqualTo(expectedEnd);
+        }
+    }
 }

Reply via email to