This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit dedd467f5fe48abdad175451155961ca2bb5a00f Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Tue Aug 6 17:52:12 2024 +0200 [FLINK-35886][task] Hide backpressure from idleness detection in TimestampsAndWatermarksOperator --- .../runtime/metrics/groups/TaskIOMetricGroup.java | 10 +++++ .../operators/TimestampsAndWatermarksOperator.java | 23 ++++++++++- .../TimestampsAndWatermarksOperatorTest.java | 44 ++++++++++++++++++++++ .../util/AbstractStreamOperatorTestHarness.java | 4 ++ 4 files changed, 79 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index a571b078c2e..25868633f30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -302,6 +302,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { return mailboxSize; } + public void registerBackPressureListener(TimerGauge.StartStopListener backPressureListener) { + hardBackPressuredTimePerSecond.registerListener(backPressureListener); + softBackPressuredTimePerSecond.registerListener(backPressureListener); + } + + public void unregisterBackPressureListener(TimerGauge.StartStopListener backPressureListener) { + hardBackPressuredTimePerSecond.unregisterListener(backPressureListener); + softBackPressuredTimePerSecond.unregisterListener(backPressureListener); + } + // ============================================================================================ // Metric Reuse // ============================================================================================ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java index 02f72f3ba32..a636ea9ab82 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java @@ -29,10 +29,10 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.util.PausableRelativeClock; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.clock.RelativeClock; -import org.apache.flink.util.clock.SystemClock; import static org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -70,6 +70,9 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T /** Whether to emit intermediate watermarks or only one final watermark at the end of input. */ private final boolean emitProgressiveWatermarks; + /** {@link PausableRelativeClock} that will be paused in case of backpressure. */ + private transient PausableRelativeClock inputActivityClock; + public TimestampsAndWatermarksOperator( WatermarkStrategy<T> watermarkStrategy, boolean emitProgressiveWatermarks) { this.watermarkStrategy = checkNotNull(watermarkStrategy); @@ -80,6 +83,12 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T @Override public void open() throws Exception { super.open(); + inputActivityClock = new PausableRelativeClock(getProcessingTimeService().getClock()); + getContainingTask() + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .registerBackPressureListener(inputActivityClock); timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup); watermarkGenerator = @@ -93,7 +102,7 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T @Override public RelativeClock getInputActivityClock() { - return SystemClock.getInstance(); + return inputActivityClock; } }) : new NoWatermarksGenerator<>(); @@ -107,6 +116,16 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T } } + @Override + public void close() throws Exception { + getContainingTask() + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .unregisterBackPressureListener(inputActivityClock); + super.close(); + } + @Override public void processElement(final StreamRecord<T> element) throws Exception { final T event = element.getValue(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java index b7f34619a30..6162c2d6f1a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.eventtime.WatermarkGenerator; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -31,10 +32,13 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; import java.io.Serializable; +import java.time.Duration; import static org.apache.flink.streaming.util.StreamRecordMatchers.streamRecord; import static org.apache.flink.streaming.util.WatermarkMatchers.legacyWatermark; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -235,6 +239,46 @@ public class TimestampsAndWatermarksOperatorTest { } } + @Test + public void watermarksWithIdlenessUnderBackpressure() throws Exception { + long idleTimeout = 100; + + TimestampsAndWatermarksOperator<Tuple2<Boolean, Long>> operator = + new TimestampsAndWatermarksOperator<>( + WatermarkStrategy.forGenerator((ctx) -> new PunctuatedWatermarkGenerator()) + .withTimestampAssigner((ctx) -> new TupleExtractor()) + .withIdleness(Duration.ofMillis(idleTimeout)), + true); + + OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>, Tuple2<Boolean, Long>> + testHarness = new OneInputStreamOperatorTestHarness<>(operator); + testHarness.open(); + + TaskIOMetricGroup taskIOMetricGroup = + testHarness.getEnvironment().getMetricGroup().getIOMetricGroup(); + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart(); + + for (int i = 0; i < 10; i++) { + testHarness.advanceTime(idleTimeout); + } + assertThat(testHarness.getOutput(), hasSize(0)); + + taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd(); + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart(); + + for (int i = 10; i < 20; i++) { + testHarness.advanceTime(idleTimeout); + } + assertThat(testHarness.getOutput(), hasSize(0)); + + taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd(); + + for (int i = 20; i < 30; i++) { + testHarness.advanceTime(idleTimeout); + } + assertThat(testHarness.getOutput(), contains(WatermarkStatus.IDLE)); + } + private static <T> OneInputStreamOperatorTestHarness<T, T> createTestHarness( WatermarkStrategy<T> watermarkStrategy) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index fc5a71cd61a..aa58c4ea8cc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -762,6 +762,10 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { return factory; } + public void advanceTime(long delta) throws Exception { + processingTimeService.advance(delta); + } + public void setProcessingTime(long time) throws Exception { processingTimeService.setCurrentTime(time); }