This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d89c1d  [FLINK-23560][runtime] Calculate absoluteTimeMillis only once 
for idle metric and throughput calculation. (#16772)
6d89c1d is described below

commit 6d89c1de833ddde82d85826e428242c74de77034
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Mon Aug 16 10:54:16 2021 +0200

    [FLINK-23560][runtime] Calculate absoluteTimeMillis only once for idle 
metric and throughput calculation. (#16772)
    
    * [FLINK-23560][runtime] Calculate absoluteTimeMillis only once for idle 
metric and throughput calculation.
    
    * [hotfix][runtime] Update measurement start time only if the measurement 
was started before throughput calculation.
---
 .../apache/flink/runtime/metrics/TimerGauge.java   | 30 ++++++++++++++--
 .../runtime/throughput/ThroughputCalculator.java   | 32 ++++++++++++-----
 .../throughput/ThroughputCalculatorTest.java       | 41 +++++++++++++---------
 .../flink/streaming/runtime/tasks/StreamTask.java  | 13 ++++---
 .../streaming/runtime/tasks/StreamTaskTest.java    |  4 +--
 5 files changed, 86 insertions(+), 34 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
index 0677395..300f542 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java
@@ -47,14 +47,38 @@ public class TimerGauge implements Gauge<Long>, View {
     }
 
     public synchronized void markStart() {
+        markStartUnsafe(clock.absoluteTimeMillis());
+    }
+
+    public synchronized void markEnd() {
+        markEndUnsafe(clock.absoluteTimeMillis());
+    }
+
+    /**
+     * Duplicate of {@link #markStart()} with ability passing the time from 
outside for possible
+     * optimization on calling {@link Clock#absoluteTimeMillis()}.
+     */
+    public synchronized void markStart(long absoluteTimeMillis) {
+        markStartUnsafe(absoluteTimeMillis);
+    }
+
+    /**
+     * Duplicate of {@link #markEnd()} with ability passing the time from 
outside for possible
+     * optimization on calling {@link Clock#absoluteTimeMillis()}.
+     */
+    public synchronized void markEnd(long absoluteTimeMillis) {
+        markEndUnsafe(absoluteTimeMillis);
+    }
+
+    private void markStartUnsafe(long absoluteTimeMillis) {
         if (currentMeasurementStart == 0) {
-            currentMeasurementStart = clock.absoluteTimeMillis();
+            currentMeasurementStart = absoluteTimeMillis;
         }
     }
 
-    public synchronized void markEnd() {
+    private void markEndUnsafe(long absoluteTimeMillis) {
         if (currentMeasurementStart != 0) {
-            currentCount += clock.absoluteTimeMillis() - 
currentMeasurementStart;
+            currentCount += absoluteTimeMillis - currentMeasurementStart;
             currentMeasurementStart = 0;
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
index e6911f0..c3eee80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
@@ -36,36 +36,50 @@ public class ThroughputCalculator {
     }
 
     public void incomingDataSize(long receivedDataSize) {
-        resumeMeasurement();
+        // Force resuming measurement.
+        if (measurementStartTime == NOT_TRACKED) {
+            measurementStartTime = clock.absoluteTimeMillis();
+        }
         currentAccumulatedDataSize += receivedDataSize;
     }
 
-    /** Mark when the time should not be taken into account. */
-    public void pauseMeasurement() {
+    /**
+     * Mark when the time should not be taken into account.
+     *
+     * @param absoluteTimeMillis Current absolute time received outside to 
avoid performance drop on
+     *     calling {@link Clock#absoluteTimeMillis()} inside of the method.
+     */
+    public void pauseMeasurement(long absoluteTimeMillis) {
         if (measurementStartTime != NOT_TRACKED) {
-            currentMeasurementTime += clock.relativeTimeMillis() - 
measurementStartTime;
+            currentMeasurementTime += absoluteTimeMillis - 
measurementStartTime;
         }
         measurementStartTime = NOT_TRACKED;
     }
 
-    /** Mark when the time should be included to the throughput calculation. */
-    public void resumeMeasurement() {
+    /**
+     * Mark when the time should be included to the throughput calculation.
+     *
+     * @param absoluteTimeMillis Current absolute time received outside to 
avoid performance drop on
+     *     calling {@link Clock#absoluteTimeMillis()} inside of the method.
+     */
+    public void resumeMeasurement(long absoluteTimeMillis) {
         if (measurementStartTime == NOT_TRACKED) {
-            measurementStartTime = clock.relativeTimeMillis();
+            measurementStartTime = absoluteTimeMillis;
         }
     }
 
     /** @return Calculated throughput based on the collected data for the last 
period. */
     public long calculateThroughput() {
         if (measurementStartTime != NOT_TRACKED) {
-            currentMeasurementTime += clock.relativeTimeMillis() - 
measurementStartTime;
+            long absoluteTimeMillis = clock.absoluteTimeMillis();
+            currentMeasurementTime += absoluteTimeMillis - 
measurementStartTime;
+            measurementStartTime = absoluteTimeMillis;
         }
 
         long throughput =
                 throughputEMA.calculateThroughput(
                         currentAccumulatedDataSize, currentMeasurementTime);
 
-        measurementStartTime = clock.relativeTimeMillis();
         currentAccumulatedDataSize = currentMeasurementTime = 0;
 
         return throughput;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
index dbe537d..4395532 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java
@@ -35,8 +35,6 @@ public class ThroughputCalculatorTest extends TestCase {
     public void testCorrectThroughputCalculation() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock, 10);
-        // Start throughput time.
-        throughputCalculator.calculateThroughput();
 
         throughputCalculator.incomingDataSize(6666);
         clock.advanceTime(Duration.ofMillis(1));
@@ -52,8 +50,6 @@ public class ThroughputCalculatorTest extends TestCase {
     public void testResetValueAfterCalculation() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock, 10);
-        // Start throughput time.
-        throughputCalculator.calculateThroughput();
 
         throughputCalculator.incomingDataSize(666);
         clock.advanceTime(Duration.ofMillis(100));
@@ -70,12 +66,10 @@ public class ThroughputCalculatorTest extends TestCase {
     public void testIgnoringIdleTime() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock, 10);
-        // Start throughput time.
-        throughputCalculator.calculateThroughput();
 
         throughputCalculator.incomingDataSize(7);
         clock.advanceTime(Duration.ofMillis(1));
-        throughputCalculator.pauseMeasurement();
+        throughputCalculator.pauseMeasurement(clock.absoluteTimeMillis());
         // This will be ignored because it is in idle now.
         clock.advanceTime(Duration.ofMillis(9));
         // This should resume the measurement time.
@@ -89,12 +83,10 @@ public class ThroughputCalculatorTest extends TestCase {
     public void testCalculationDuringIdleTime() {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock, 10);
-        // Start throughput time.
-        throughputCalculator.calculateThroughput();
 
         throughputCalculator.incomingDataSize(10);
         clock.advanceTime(Duration.ofMillis(1));
-        throughputCalculator.pauseMeasurement();
+        throughputCalculator.pauseMeasurement(clock.absoluteTimeMillis());
         // This will be ignored because it is in idle now.
         clock.advanceTime(Duration.ofMillis(9));
 
@@ -106,23 +98,40 @@ public class ThroughputCalculatorTest extends TestCase {
         ManualClock clock = new ManualClock();
         ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock, 10);
 
-        // Start throughput time.
-        throughputCalculator.calculateThroughput();
-
         throughputCalculator.incomingDataSize(10);
         // It won't be ignored.
         clock.advanceTime(Duration.ofMillis(3));
-        throughputCalculator.resumeMeasurement();
+        throughputCalculator.resumeMeasurement(clock.absoluteTimeMillis());
         // It won't be ignored.
         clock.advanceTime(Duration.ofMillis(3));
-        throughputCalculator.resumeMeasurement();
+        throughputCalculator.resumeMeasurement(clock.absoluteTimeMillis());
         // It won't be ignored.
         clock.advanceTime(Duration.ofMillis(3));
-        throughputCalculator.resumeMeasurement();
+        throughputCalculator.resumeMeasurement(clock.absoluteTimeMillis());
 
         clock.advanceTime(Duration.ofMillis(1));
 
         // resumeMeasurement should not reset the time because 
pauseMeasurement was not called.
         assertThat(throughputCalculator.calculateThroughput(), is(1_000L));
     }
+
+    @Test
+    public void testNotRestartTimerOnCalculationDuringIdleTime() {
+        ManualClock clock = new ManualClock();
+        ThroughputCalculator throughputCalculator = new 
ThroughputCalculator(clock, 10);
+
+        throughputCalculator.pauseMeasurement(clock.absoluteTimeMillis());
+
+        // Should not resume measurement.
+        throughputCalculator.calculateThroughput();
+
+        // This will be ignored because it is still in idle.
+        clock.advanceTime(Duration.ofMillis(9));
+
+        // Resume measurement.
+        throughputCalculator.incomingDataSize(10);
+        clock.advanceTime(Duration.ofMillis(1));
+
+        assertThat(throughputCalculator.calculateThroughput(), is(10L * 
1_000));
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 31110dc..b64413e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -98,6 +98,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.WrappingRuntimeException;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.RunnableWithException;
@@ -1703,6 +1705,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
      * combine signal for metric and the throughput.
      */
     private static class ThroughputPeriodTimer implements PeriodTimer {
+        private final Clock clock = SystemClock.getInstance();
         private final TimerGauge idleTimerGauge;
         private final ThroughputCalculator throughputCalculator;
 
@@ -1714,14 +1717,16 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
 
         @Override
         public void markStart() {
-            idleTimerGauge.markStart();
-            throughputCalculator.pauseMeasurement();
+            long absoluteTimeMillis = clock.absoluteTimeMillis();
+            idleTimerGauge.markStart(absoluteTimeMillis);
+            throughputCalculator.pauseMeasurement(absoluteTimeMillis);
         }
 
         @Override
         public void markEnd() {
-            idleTimerGauge.markEnd();
-            throughputCalculator.resumeMeasurement();
+            long absoluteTimeMillis = clock.absoluteTimeMillis();
+            idleTimerGauge.markEnd(absoluteTimeMillis);
+            throughputCalculator.resumeMeasurement(absoluteTimeMillis);
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 31eef9c..9bd7195 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1543,11 +1543,11 @@ public class StreamTaskTest extends TestLogger {
 
             SystemClock clock = SystemClock.getInstance();
 
-            long startTs = clock.relativeTimeMillis();
+            long startTs = clock.absoluteTimeMillis();
             throughputCalculator.incomingDataSize(incomingDataSize);
             task.invoke();
             long resultThroughput = throughputCalculator.calculateThroughput();
-            long totalDuration = clock.relativeTimeMillis() - startTs;
+            long totalDuration = clock.absoluteTimeMillis() - startTs;
 
             assertThat(
                     resultThroughput,

Reply via email to