This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6d89c1d [FLINK-23560][runtime] Calculate absoluteTimeMillis only once for idle metric and throughput calculation. (#16772) 6d89c1d is described below commit 6d89c1de833ddde82d85826e428242c74de77034 Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Mon Aug 16 10:54:16 2021 +0200 [FLINK-23560][runtime] Calculate absoluteTimeMillis only once for idle metric and throughput calculation. (#16772) * [FLINK-23560][runtime] Calculate absoluteTimeMillis only once for idle metric and throughput calculation. * [hotfix][runtime] Update measurement start time only if the measurement was started before throughput calculation. --- .../apache/flink/runtime/metrics/TimerGauge.java | 30 ++++++++++++++-- .../runtime/throughput/ThroughputCalculator.java | 32 ++++++++++++----- .../throughput/ThroughputCalculatorTest.java | 41 +++++++++++++--------- .../flink/streaming/runtime/tasks/StreamTask.java | 13 ++++--- .../streaming/runtime/tasks/StreamTaskTest.java | 4 +-- 5 files changed, 86 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java index 0677395..300f542 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/TimerGauge.java @@ -47,14 +47,38 @@ public class TimerGauge implements Gauge<Long>, View { } public synchronized void markStart() { + markStartUnsafe(clock.absoluteTimeMillis()); + } + + public synchronized void markEnd() { + markEndUnsafe(clock.absoluteTimeMillis()); + } + + /** + * Duplicate of {@link #markStart()} with ability passing the time from outside for possible + * optimization on calling {@link Clock#absoluteTimeMillis()}. + */ + public synchronized void markStart(long absoluteTimeMillis) { + markStartUnsafe(absoluteTimeMillis); + } + + /** + * Duplicate of {@link #markEnd()} with ability passing the time from outside for possible + * optimization on calling {@link Clock#absoluteTimeMillis()}. + */ + public synchronized void markEnd(long absoluteTimeMillis) { + markEndUnsafe(absoluteTimeMillis); + } + + private void markStartUnsafe(long absoluteTimeMillis) { if (currentMeasurementStart == 0) { - currentMeasurementStart = clock.absoluteTimeMillis(); + currentMeasurementStart = absoluteTimeMillis; } } - public synchronized void markEnd() { + private void markEndUnsafe(long absoluteTimeMillis) { if (currentMeasurementStart != 0) { - currentCount += clock.absoluteTimeMillis() - currentMeasurementStart; + currentCount += absoluteTimeMillis - currentMeasurementStart; currentMeasurementStart = 0; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java index e6911f0..c3eee80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java @@ -36,36 +36,50 @@ public class ThroughputCalculator { } public void incomingDataSize(long receivedDataSize) { - resumeMeasurement(); + // Force resuming measurement. + if (measurementStartTime == NOT_TRACKED) { + measurementStartTime = clock.absoluteTimeMillis(); + } currentAccumulatedDataSize += receivedDataSize; } - /** Mark when the time should not be taken into account. */ - public void pauseMeasurement() { + /** + * Mark when the time should not be taken into account. + * + * @param absoluteTimeMillis Current absolute time received outside to avoid performance drop on + * calling {@link Clock#absoluteTimeMillis()} inside of the method. + */ + public void pauseMeasurement(long absoluteTimeMillis) { if (measurementStartTime != NOT_TRACKED) { - currentMeasurementTime += clock.relativeTimeMillis() - measurementStartTime; + currentMeasurementTime += absoluteTimeMillis - measurementStartTime; } measurementStartTime = NOT_TRACKED; } - /** Mark when the time should be included to the throughput calculation. */ - public void resumeMeasurement() { + /** + * Mark when the time should be included to the throughput calculation. + * + * @param absoluteTimeMillis Current absolute time received outside to avoid performance drop on + * calling {@link Clock#absoluteTimeMillis()} inside of the method. + */ + public void resumeMeasurement(long absoluteTimeMillis) { if (measurementStartTime == NOT_TRACKED) { - measurementStartTime = clock.relativeTimeMillis(); + measurementStartTime = absoluteTimeMillis; } } /** @return Calculated throughput based on the collected data for the last period. */ public long calculateThroughput() { if (measurementStartTime != NOT_TRACKED) { - currentMeasurementTime += clock.relativeTimeMillis() - measurementStartTime; + long absoluteTimeMillis = clock.absoluteTimeMillis(); + currentMeasurementTime += absoluteTimeMillis - measurementStartTime; + measurementStartTime = absoluteTimeMillis; } long throughput = throughputEMA.calculateThroughput( currentAccumulatedDataSize, currentMeasurementTime); - measurementStartTime = clock.relativeTimeMillis(); currentAccumulatedDataSize = currentMeasurementTime = 0; return throughput; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java index dbe537d..4395532 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/ThroughputCalculatorTest.java @@ -35,8 +35,6 @@ public class ThroughputCalculatorTest extends TestCase { public void testCorrectThroughputCalculation() { ManualClock clock = new ManualClock(); ThroughputCalculator throughputCalculator = new ThroughputCalculator(clock, 10); - // Start throughput time. - throughputCalculator.calculateThroughput(); throughputCalculator.incomingDataSize(6666); clock.advanceTime(Duration.ofMillis(1)); @@ -52,8 +50,6 @@ public class ThroughputCalculatorTest extends TestCase { public void testResetValueAfterCalculation() { ManualClock clock = new ManualClock(); ThroughputCalculator throughputCalculator = new ThroughputCalculator(clock, 10); - // Start throughput time. - throughputCalculator.calculateThroughput(); throughputCalculator.incomingDataSize(666); clock.advanceTime(Duration.ofMillis(100)); @@ -70,12 +66,10 @@ public class ThroughputCalculatorTest extends TestCase { public void testIgnoringIdleTime() { ManualClock clock = new ManualClock(); ThroughputCalculator throughputCalculator = new ThroughputCalculator(clock, 10); - // Start throughput time. - throughputCalculator.calculateThroughput(); throughputCalculator.incomingDataSize(7); clock.advanceTime(Duration.ofMillis(1)); - throughputCalculator.pauseMeasurement(); + throughputCalculator.pauseMeasurement(clock.absoluteTimeMillis()); // This will be ignored because it is in idle now. clock.advanceTime(Duration.ofMillis(9)); // This should resume the measurement time. @@ -89,12 +83,10 @@ public class ThroughputCalculatorTest extends TestCase { public void testCalculationDuringIdleTime() { ManualClock clock = new ManualClock(); ThroughputCalculator throughputCalculator = new ThroughputCalculator(clock, 10); - // Start throughput time. - throughputCalculator.calculateThroughput(); throughputCalculator.incomingDataSize(10); clock.advanceTime(Duration.ofMillis(1)); - throughputCalculator.pauseMeasurement(); + throughputCalculator.pauseMeasurement(clock.absoluteTimeMillis()); // This will be ignored because it is in idle now. clock.advanceTime(Duration.ofMillis(9)); @@ -106,23 +98,40 @@ public class ThroughputCalculatorTest extends TestCase { ManualClock clock = new ManualClock(); ThroughputCalculator throughputCalculator = new ThroughputCalculator(clock, 10); - // Start throughput time. - throughputCalculator.calculateThroughput(); - throughputCalculator.incomingDataSize(10); // It won't be ignored. clock.advanceTime(Duration.ofMillis(3)); - throughputCalculator.resumeMeasurement(); + throughputCalculator.resumeMeasurement(clock.absoluteTimeMillis()); // It won't be ignored. clock.advanceTime(Duration.ofMillis(3)); - throughputCalculator.resumeMeasurement(); + throughputCalculator.resumeMeasurement(clock.absoluteTimeMillis()); // It won't be ignored. clock.advanceTime(Duration.ofMillis(3)); - throughputCalculator.resumeMeasurement(); + throughputCalculator.resumeMeasurement(clock.absoluteTimeMillis()); clock.advanceTime(Duration.ofMillis(1)); // resumeMeasurement should not reset the time because pauseMeasurement was not called. assertThat(throughputCalculator.calculateThroughput(), is(1_000L)); } + + @Test + public void testNotRestartTimerOnCalculationDuringIdleTime() { + ManualClock clock = new ManualClock(); + ThroughputCalculator throughputCalculator = new ThroughputCalculator(clock, 10); + + throughputCalculator.pauseMeasurement(clock.absoluteTimeMillis()); + + // Should not resume measurement. + throughputCalculator.calculateThroughput(); + + // This will be ignored because it is still in idle. + clock.advanceTime(Duration.ofMillis(9)); + + // Resume measurement. + throughputCalculator.incomingDataSize(10); + clock.advanceTime(Duration.ofMillis(1)); + + assertThat(throughputCalculator.calculateThroughput(), is(10L * 1_000)); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 31110dc..b64413e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -98,6 +98,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TernaryBoolean; import org.apache.flink.util.WrappingRuntimeException; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.RunnableWithException; @@ -1703,6 +1705,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab * combine signal for metric and the throughput. */ private static class ThroughputPeriodTimer implements PeriodTimer { + private final Clock clock = SystemClock.getInstance(); private final TimerGauge idleTimerGauge; private final ThroughputCalculator throughputCalculator; @@ -1714,14 +1717,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab @Override public void markStart() { - idleTimerGauge.markStart(); - throughputCalculator.pauseMeasurement(); + long absoluteTimeMillis = clock.absoluteTimeMillis(); + idleTimerGauge.markStart(absoluteTimeMillis); + throughputCalculator.pauseMeasurement(absoluteTimeMillis); } @Override public void markEnd() { - idleTimerGauge.markEnd(); - throughputCalculator.resumeMeasurement(); + long absoluteTimeMillis = clock.absoluteTimeMillis(); + idleTimerGauge.markEnd(absoluteTimeMillis); + throughputCalculator.resumeMeasurement(absoluteTimeMillis); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 31eef9c..9bd7195 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -1543,11 +1543,11 @@ public class StreamTaskTest extends TestLogger { SystemClock clock = SystemClock.getInstance(); - long startTs = clock.relativeTimeMillis(); + long startTs = clock.absoluteTimeMillis(); throughputCalculator.incomingDataSize(incomingDataSize); task.invoke(); long resultThroughput = throughputCalculator.calculateThroughput(); - long totalDuration = clock.relativeTimeMillis() - startTs; + long totalDuration = clock.absoluteTimeMillis() - startTs; assertThat( resultThroughput,