This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dedd467f5fe48abdad175451155961ca2bb5a00f
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Tue Aug 6 17:52:12 2024 +0200

    [FLINK-35886][task] Hide backpressure from idleness detection in 
TimestampsAndWatermarksOperator
---
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 10 +++++
 .../operators/TimestampsAndWatermarksOperator.java | 23 ++++++++++-
 .../TimestampsAndWatermarksOperatorTest.java       | 44 ++++++++++++++++++++++
 .../util/AbstractStreamOperatorTestHarness.java    |  4 ++
 4 files changed, 79 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index a571b078c2e..25868633f30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -302,6 +302,16 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         return mailboxSize;
     }
 
+    public void registerBackPressureListener(TimerGauge.StartStopListener 
backPressureListener) {
+        hardBackPressuredTimePerSecond.registerListener(backPressureListener);
+        softBackPressuredTimePerSecond.registerListener(backPressureListener);
+    }
+
+    public void unregisterBackPressureListener(TimerGauge.StartStopListener 
backPressureListener) {
+        
hardBackPressuredTimePerSecond.unregisterListener(backPressureListener);
+        
softBackPressuredTimePerSecond.unregisterListener(backPressureListener);
+    }
+
     // 
============================================================================================
     // Metric Reuse
     // 
============================================================================================
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index 02f72f3ba32..a636ea9ab82 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -29,10 +29,10 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.clock.RelativeClock;
-import org.apache.flink.util.clock.SystemClock;
 
 import static 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,6 +70,9 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
     /** Whether to emit intermediate watermarks or only one final watermark at 
the end of input. */
     private final boolean emitProgressiveWatermarks;
 
+    /** {@link PausableRelativeClock} that will be paused in case of 
backpressure. */
+    private transient PausableRelativeClock inputActivityClock;
+
     public TimestampsAndWatermarksOperator(
             WatermarkStrategy<T> watermarkStrategy, boolean 
emitProgressiveWatermarks) {
         this.watermarkStrategy = checkNotNull(watermarkStrategy);
@@ -80,6 +83,12 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
     @Override
     public void open() throws Exception {
         super.open();
+        inputActivityClock = new 
PausableRelativeClock(getProcessingTimeService().getClock());
+        getContainingTask()
+                .getEnvironment()
+                .getMetricGroup()
+                .getIOMetricGroup()
+                .registerBackPressureListener(inputActivityClock);
 
         timestampAssigner = 
watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
         watermarkGenerator =
@@ -93,7 +102,7 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
 
                                     @Override
                                     public RelativeClock 
getInputActivityClock() {
-                                        return SystemClock.getInstance();
+                                        return inputActivityClock;
                                     }
                                 })
                         : new NoWatermarksGenerator<>();
@@ -107,6 +116,16 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        getContainingTask()
+                .getEnvironment()
+                .getMetricGroup()
+                .getIOMetricGroup()
+                .unregisterBackPressureListener(inputActivityClock);
+        super.close();
+    }
+
     @Override
     public void processElement(final StreamRecord<T> element) throws Exception 
{
         final T event = element.getValue();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index b7f34619a30..6162c2d6f1a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -31,10 +32,13 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.time.Duration;
 
 import static 
org.apache.flink.streaming.util.StreamRecordMatchers.streamRecord;
 import static 
org.apache.flink.streaming.util.WatermarkMatchers.legacyWatermark;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -235,6 +239,46 @@ public class TimestampsAndWatermarksOperatorTest {
         }
     }
 
+    @Test
+    public void watermarksWithIdlenessUnderBackpressure() throws Exception {
+        long idleTimeout = 100;
+
+        TimestampsAndWatermarksOperator<Tuple2<Boolean, Long>> operator =
+                new TimestampsAndWatermarksOperator<>(
+                        WatermarkStrategy.forGenerator((ctx) -> new 
PunctuatedWatermarkGenerator())
+                                .withTimestampAssigner((ctx) -> new 
TupleExtractor())
+                                .withIdleness(Duration.ofMillis(idleTimeout)),
+                        true);
+
+        OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>, 
Tuple2<Boolean, Long>>
+                testHarness = new 
OneInputStreamOperatorTestHarness<>(operator);
+        testHarness.open();
+
+        TaskIOMetricGroup taskIOMetricGroup =
+                
testHarness.getEnvironment().getMetricGroup().getIOMetricGroup();
+        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart();
+
+        for (int i = 0; i < 10; i++) {
+            testHarness.advanceTime(idleTimeout);
+        }
+        assertThat(testHarness.getOutput(), hasSize(0));
+
+        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd();
+        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart();
+
+        for (int i = 10; i < 20; i++) {
+            testHarness.advanceTime(idleTimeout);
+        }
+        assertThat(testHarness.getOutput(), hasSize(0));
+
+        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd();
+
+        for (int i = 20; i < 30; i++) {
+            testHarness.advanceTime(idleTimeout);
+        }
+        assertThat(testHarness.getOutput(), contains(WatermarkStatus.IDLE));
+    }
+
     private static <T> OneInputStreamOperatorTestHarness<T, T> 
createTestHarness(
             WatermarkStrategy<T> watermarkStrategy) throws Exception {
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index fc5a71cd61a..aa58c4ea8cc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -762,6 +762,10 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         return factory;
     }
 
+    public void advanceTime(long delta) throws Exception {
+        processingTimeService.advance(delta);
+    }
+
     public void setProcessingTime(long time) throws Exception {
         processingTimeService.setCurrentTime(time);
     }

Reply via email to