This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74d47efaf9d15078763ccae831fac005f42508cb
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Mon Aug 5 15:26:29 2024 +0200

    [FLINK-35886][task] Hide backpressure and watermark alignment from idleness 
detection in SourceOperator
---
 .../streaming/api/operators/SourceOperator.java    |  27 +++-
 .../source/NoOpTimestampsAndWatermarks.java        |   6 +
 .../source/ProgressiveTimestampsAndWatermarks.java |  79 ++++++++--
 .../operators/source/TimestampsAndWatermarks.java  |  47 +++++-
 .../source/TimestampsAndWatermarksContext.java     |   8 +-
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 167 +++++++++++++++++++++
 6 files changed, 311 insertions(+), 23 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index d9a04853107..b26edc77c63 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
@@ -52,6 +53,7 @@ import 
org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
+import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
@@ -197,6 +199,12 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     private final CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;
 
+    /**
+     * {@link PausableRelativeClock} tracking activity of the operator's main 
input. It's paused on
+     * backpressure. Note, each split output has its own independent {@link 
PausableRelativeClock}.
+     */
+    private transient PausableRelativeClock mainInputActivityClock;
+
     public SourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<OUT, 
SplitT>, Exception>
                     readerFactory,
@@ -322,6 +330,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public void open() throws Exception {
+        mainInputActivityClock = new 
PausableRelativeClock(getProcessingTimeService().getClock());
+        TaskIOMetricGroup taskIOMetricGroup =
+                
getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup();
+        taskIOMetricGroup.registerBackPressureListener(mainInputActivityClock);
+
         initReader();
 
         // in the future when we this one is migrated to the "eager 
initialization" operator
@@ -332,11 +345,14 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                             watermarkStrategy,
                             sourceMetricGroup,
                             getProcessingTimeService(),
-                            getExecutionConfig().getAutoWatermarkInterval());
+                            getExecutionConfig().getAutoWatermarkInterval(),
+                            mainInputActivityClock,
+                            getProcessingTimeService().getClock(),
+                            taskIOMetricGroup);
         } else {
             eventTimeLogic =
                     TimestampsAndWatermarks.createNoOpEventTimeLogic(
-                            watermarkStrategy, sourceMetricGroup);
+                            watermarkStrategy, sourceMetricGroup, 
mainInputActivityClock);
         }
 
         // restore the state if necessary.
@@ -396,6 +412,12 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
 
     @Override
     public void close() throws Exception {
+        getContainingTask()
+                .getEnvironment()
+                .getMetricGroup()
+                .getIOMetricGroup()
+                .unregisterBackPressureListener(mainInputActivityClock);
+
         if (sourceReader != null) {
             sourceReader.close();
         }
@@ -675,6 +697,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
         try {
             sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
+            eventTimeLogic.pauseOrResumeSplits(splitsToPause, splitsToResume);
         } catch (UnsupportedOperationException e) {
             if (!allowUnalignedSourceSplits) {
                 throw e;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
index 3d21512a143..b2debd089b5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/NoOpTimestampsAndWatermarks.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
 
+import java.util.Collection;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -68,6 +70,10 @@ public class NoOpTimestampsAndWatermarks<T> implements 
TimestampsAndWatermarks<T
         // do nothing
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {}
+
     // ------------------------------------------------------------------------
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
index 98507c096b9..ca0c5f47b9b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
@@ -27,16 +27,23 @@ import 
org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.streaming.api.operators.util.PausableRelativeClock;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.RelativeClock;
 
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -53,12 +60,18 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
 
     private final WatermarkGeneratorSupplier<T> watermarksFactory;
 
-    private final WatermarkGeneratorSupplier.Context watermarksContext;
+    private final TimestampsAndWatermarksContextProvider 
watermarksContextProvider;
 
     private final ProcessingTimeService timeService;
 
     private final long periodicWatermarkInterval;
 
+    private final RelativeClock mainInputActivityClock;
+
+    private final Clock clock;
+
+    private final TaskIOMetricGroup taskIOMetricGroup;
+
     @Nullable private SplitLocalOutputs<T> currentPerSplitOutputs;
 
     @Nullable private StreamingReaderOutput<T> currentMainOutput;
@@ -68,14 +81,20 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
     public ProgressiveTimestampsAndWatermarks(
             TimestampAssigner<T> timestampAssigner,
             WatermarkGeneratorSupplier<T> watermarksFactory,
-            WatermarkGeneratorSupplier.Context watermarksContext,
+            TimestampsAndWatermarksContextProvider watermarksContextProvider,
             ProcessingTimeService timeService,
-            Duration periodicWatermarkInterval) {
+            Duration periodicWatermarkInterval,
+            RelativeClock mainInputActivityClock,
+            Clock clock,
+            TaskIOMetricGroup taskIOMetricGroup) {
 
         this.timestampAssigner = timestampAssigner;
         this.watermarksFactory = watermarksFactory;
-        this.watermarksContext = watermarksContext;
+        this.watermarksContextProvider = watermarksContextProvider;
         this.timeService = timeService;
+        this.mainInputActivityClock = mainInputActivityClock;
+        this.clock = clock;
+        this.taskIOMetricGroup = taskIOMetricGroup;
 
         long periodicWatermarkIntervalMillis;
         try {
@@ -106,7 +125,8 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
         IdlenessManager idlenessManager = new IdlenessManager(watermarkOutput);
 
         final WatermarkGenerator<T> watermarkGenerator =
-                watermarksFactory.createWatermarkGenerator(watermarksContext);
+                watermarksFactory.createWatermarkGenerator(
+                        
watermarksContextProvider.create(mainInputActivityClock));
 
         currentPerSplitOutputs =
                 new SplitLocalOutputs<>(
@@ -115,7 +135,9 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
                         watermarkUpdateListener,
                         timestampAssigner,
                         watermarksFactory,
-                        watermarksContext);
+                        watermarksContextProvider,
+                        clock,
+                        taskIOMetricGroup);
 
         currentMainOutput =
                 new StreamingReaderOutput<>(
@@ -162,6 +184,12 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
         }
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        currentPerSplitOutputs.pauseOrResumeSplits(splitsToPause, 
splitsToResume);
+    }
+
     // ------------------------------------------------------------------------
 
     private static final class StreamingReaderOutput<T> extends 
SourceOutputWithWatermarks<T>
@@ -203,11 +231,14 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
 
         private final WatermarkOutputMultiplexer watermarkMultiplexer;
         private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
+        private final Map<String, PausableRelativeClock> inputActivityClocks = 
new HashMap<>();
         private final PushingAsyncDataInput.DataOutput<T> recordOutput;
         private final TimestampAssigner<T> timestampAssigner;
         private final WatermarkGeneratorSupplier<T> watermarksFactory;
-        private final WatermarkGeneratorSupplier.Context watermarkContext;
+        private final TimestampsAndWatermarksContextProvider 
watermarksContextProvider;
         private final WatermarkUpdateListener watermarkUpdateListener;
+        private final Clock clock;
+        private final TaskIOMetricGroup taskIOMetricGroup;
 
         private SplitLocalOutputs(
                 PushingAsyncDataInput.DataOutput<T> recordOutput,
@@ -215,13 +246,17 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
                 WatermarkUpdateListener watermarkUpdateListener,
                 TimestampAssigner<T> timestampAssigner,
                 WatermarkGeneratorSupplier<T> watermarksFactory,
-                WatermarkGeneratorSupplier.Context watermarkContext) {
+                TimestampsAndWatermarksContextProvider 
watermarksContextProvider,
+                Clock clock,
+                TaskIOMetricGroup taskIOMetricGroup) {
 
             this.recordOutput = recordOutput;
             this.timestampAssigner = timestampAssigner;
             this.watermarksFactory = watermarksFactory;
-            this.watermarkContext = watermarkContext;
+            this.watermarksContextProvider = watermarksContextProvider;
             this.watermarkUpdateListener = watermarkUpdateListener;
+            this.clock = clock;
+            this.taskIOMetricGroup = taskIOMetricGroup;
 
             this.watermarkMultiplexer = new 
WatermarkOutputMultiplexer(watermarkOutput);
             this.localOutputs =
@@ -234,6 +269,7 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
                 return previous;
             }
 
+            PausableRelativeClock inputActivityClock = 
createInputActivityClock(splitId);
             watermarkMultiplexer.registerNewOutput(
                     splitId,
                     watermark ->
@@ -243,7 +279,8 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
             final WatermarkOutput periodicOutput = 
watermarkMultiplexer.getDeferredOutput(splitId);
 
             final WatermarkGenerator<T> watermarks =
-                    
watermarksFactory.createWatermarkGenerator(watermarkContext);
+                    watermarksFactory.createWatermarkGenerator(
+                            
watermarksContextProvider.create(inputActivityClock));
 
             final SourceOutputWithWatermarks<T> localOutput =
                     SourceOutputWithWatermarks.createWithSeparateOutputs(
@@ -257,9 +294,21 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
             return localOutput;
         }
 
+        private PausableRelativeClock createInputActivityClock(String splitId) 
{
+            // Dedicated inputActivityClock for a particular split. It will be 
paused both in case
+            // of back pressure and when split is paused due to watermark 
alignment.
+            PausableRelativeClock inputActivityClock = new 
PausableRelativeClock(clock);
+            inputActivityClocks.put(splitId, inputActivityClock);
+            taskIOMetricGroup.registerBackPressureListener(inputActivityClock);
+            return inputActivityClock;
+        }
+
         void releaseOutputForSplit(String splitId) {
             localOutputs.remove(splitId);
             watermarkMultiplexer.unregisterOutput(splitId);
+            PausableRelativeClock inputActivityClock =
+                    requireNonNull(inputActivityClocks.remove(splitId));
+            
taskIOMetricGroup.unregisterBackPressureListener(inputActivityClock);
         }
 
         void emitPeriodicWatermark() {
@@ -273,6 +322,16 @@ public class ProgressiveTimestampsAndWatermarks<T> 
implements TimestampsAndWater
             }
             watermarkMultiplexer.onPeriodicEmit();
         }
+
+        public void pauseOrResumeSplits(
+                Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+            for (String splitId : splitsToPause) {
+                inputActivityClocks.get(splitId).pause();
+            }
+            for (String splitId : splitsToResume) {
+                inputActivityClocks.get(splitId).unPause();
+            }
+        }
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
index 84bb7457601..b6e1a95acaf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -24,10 +24,14 @@ import 
org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.RelativeClock;
 
 import java.time.Duration;
+import java.util.Collection;
 
 /**
  * Basic interface for the timestamp extraction and watermark generation logic 
for the {@link
@@ -83,6 +87,8 @@ public interface TimestampsAndWatermarks<T> {
     /** Emit a watermark immediately. */
     void emitImmediateWatermark(long wallClockTimestamp);
 
+    void pauseOrResumeSplits(Collection<String> splitsToPause, 
Collection<String> splitsToResume);
+
     // ------------------------------------------------------------------------
     //  factories
     // ------------------------------------------------------------------------
@@ -91,27 +97,52 @@ public interface TimestampsAndWatermarks<T> {
             WatermarkStrategy<E> watermarkStrategy,
             MetricGroup metrics,
             ProcessingTimeService timeService,
-            long periodicWatermarkIntervalMillis) {
+            long periodicWatermarkIntervalMillis,
+            RelativeClock mainInputActivityClock,
+            Clock clock,
+            TaskIOMetricGroup taskIOMetricGroup) {
 
-        final TimestampsAndWatermarksContext context = new 
TimestampsAndWatermarksContext(metrics);
-        final TimestampAssigner<E> timestampAssigner =
-                watermarkStrategy.createTimestampAssigner(context);
+        TimestampsAndWatermarksContextProvider contextProvider =
+                new TimestampsAndWatermarksContextProvider(metrics);
+        TimestampAssigner<E> timestampAssigner =
+                watermarkStrategy.createTimestampAssigner(
+                        contextProvider.create(mainInputActivityClock));
 
         return new ProgressiveTimestampsAndWatermarks<>(
                 timestampAssigner,
                 watermarkStrategy,
-                context,
+                contextProvider,
                 timeService,
-                Duration.ofMillis(periodicWatermarkIntervalMillis));
+                Duration.ofMillis(periodicWatermarkIntervalMillis),
+                mainInputActivityClock,
+                clock,
+                taskIOMetricGroup);
     }
 
     static <E> TimestampsAndWatermarks<E> createNoOpEventTimeLogic(
-            WatermarkStrategy<E> watermarkStrategy, MetricGroup metrics) {
+            WatermarkStrategy<E> watermarkStrategy,
+            MetricGroup metrics,
+            RelativeClock inputActivityClock) {
 
-        final TimestampsAndWatermarksContext context = new 
TimestampsAndWatermarksContext(metrics);
+        final TimestampsAndWatermarksContext context =
+                new TimestampsAndWatermarksContext(metrics, 
inputActivityClock);
         final TimestampAssigner<E> timestampAssigner =
                 watermarkStrategy.createTimestampAssigner(context);
 
         return new NoOpTimestampsAndWatermarks<>(timestampAssigner);
     }
+
+    /** Helper class to construct {@link TimestampsAndWatermarksContext}. */
+    @Internal
+    class TimestampsAndWatermarksContextProvider {
+        private final MetricGroup metrics;
+
+        public TimestampsAndWatermarksContextProvider(MetricGroup metrics) {
+            this.metrics = metrics;
+        }
+
+        public TimestampsAndWatermarksContext create(RelativeClock 
inputActivityClock) {
+            return new TimestampsAndWatermarksContext(metrics, 
inputActivityClock);
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
index 8ec6cc8c01f..6f543e0392c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.clock.RelativeClock;
-import org.apache.flink.util.clock.SystemClock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -36,9 +35,12 @@ public final class TimestampsAndWatermarksContext
         implements TimestampAssignerSupplier.Context, 
WatermarkGeneratorSupplier.Context {
 
     private final MetricGroup metricGroup;
+    private final RelativeClock inputActivityClock;
 
-    public TimestampsAndWatermarksContext(MetricGroup metricGroup) {
+    public TimestampsAndWatermarksContext(
+            MetricGroup metricGroup, RelativeClock inputActivityClock) {
         this.metricGroup = checkNotNull(metricGroup);
+        this.inputActivityClock = inputActivityClock;
     }
 
     @Override
@@ -48,6 +50,6 @@ public final class TimestampsAndWatermarksContext
 
     @Override
     public RelativeClock getInputActivityClock() {
-        return SystemClock.getInstance();
+        return inputActivityClock;
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 8be17a021d5..1a199b23123 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -40,6 +41,7 @@ import 
org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.MockOutput;
 import org.apache.flink.streaming.util.MockStreamConfig;
 
@@ -110,6 +112,144 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1");
     }
 
+    @Test
+    void testBackpressureAndIdleness() throws Exception {
+        long idleTimeout = 100;
+        MockSourceReader sourceReader =
+                new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
false, true);
+        TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+        SourceOperator<Integer, MockSourceSplit> operator =
+                createAndOpenSourceOperatorWithIdleness(
+                        sourceReader, processingTimeService, idleTimeout);
+
+        /**
+         * Intention behind this setup is that split0 emits a couple of 
records, while we keep
+         * advancing processing time and keep firing timers. Normally split1 
would switch to idle
+         * first (it hasn't emitted any records), which would cause a 
watermark from split0 to be
+         * emitted and then WatermarkStatus.IDLE should be emitted after 
split0 also switches to
+         * idle. However we assert that neither watermark no idle status this 
doesn't happen due to
+         * the back pressure status.
+         */
+        MockSourceSplit split0 = new MockSourceSplit(0, 0, 
10).addRecord(42).addRecord(44);
+        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Arrays.asList(split0, split1), new 
MockSourceSplitSerializer()));
+
+        CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
+
+        // Output is initialised by the SourceOperator on the first emitNext 
invocation
+        operator.emitNext(dataOutput);
+
+        TaskIOMetricGroup taskIOMetricGroup =
+                
operator.getContainingTask().getEnvironment().getMetricGroup().getIOMetricGroup();
+        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markStart();
+
+        for (int i = 0; i < 10; i++) {
+            processingTimeService.advance(idleTimeout);
+            operator.emitNext(dataOutput);
+        }
+        
assertThat(dataOutput.getEvents()).doesNotContain(WatermarkStatus.IDLE);
+        assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark());
+
+        taskIOMetricGroup.getHardBackPressuredTimePerSecond().markEnd();
+        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markStart();
+
+        for (int i = 0; i < 10; i++) {
+            processingTimeService.advance(idleTimeout);
+        }
+        
assertThat(dataOutput.getEvents()).doesNotContain(WatermarkStatus.IDLE);
+        assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark());
+
+        taskIOMetricGroup.getSoftBackPressuredTimePerSecond().markEnd();
+
+        for (int i = 0; i < 10; i++) {
+            processingTimeService.advance(idleTimeout);
+        }
+
+        assertThat(dataOutput.getEvents()).contains(WatermarkStatus.IDLE);
+        assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark());
+    }
+
+    @Test
+    void testSplitWatermarkAlignmentAndIdleness() throws Exception {
+        long idleTimeout = 100;
+        MockSourceReader sourceReader =
+                new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS, 
false, true);
+        TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+        SourceOperator<Integer, MockSourceSplit> operator =
+                createAndOpenSourceOperatorWithIdleness(
+                        sourceReader, processingTimeService, idleTimeout);
+
+        MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+        MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
+        int maxAllowedWatermark = 4;
+        int maxEmittedWatermark = maxAllowedWatermark + 1;
+        // the intention is that only first record from split0 gets emitted, 
then split0 gets
+        // blocked and record (maxEmittedWatermark + 100) is never emitted 
from split0
+        split0.addRecord(maxEmittedWatermark).addRecord(maxEmittedWatermark + 
100);
+        split1.addRecord(3)
+                .addRecord(3)
+                .addRecord(3)
+                .addRecord(3)
+                .addRecord(3)
+                .addRecord(3)
+                .addRecord(3);
+        split1.addRecord(maxEmittedWatermark + 100);
+
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Arrays.asList(split0, split1), new 
MockSourceSplitSerializer()));
+        CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
+
+        operator.emitNext(dataOutput); // split0 emits first (and only) record 
(maxEmittedWatermark)
+
+        operator.handleOperatorEvent(
+                new WatermarkAlignmentEvent(maxAllowedWatermark)); // blocks 
split0
+        assertThat(sourceReader.getPausedSplits()).containsExactly("0");
+
+        while (operator.isAvailable()) {
+            // We are advancing a couple of times by (idleTimeout - 1) to make 
sure the active input
+            // never switches idle, while giving plenty of time for the 
blocked split0 to evaluate
+            // it's idle state
+            processingTimeService.advance(idleTimeout - 1);
+            operator.emitNext(dataOutput); // split1 keeps emitting records
+        }
+        // in the end, all records are emitted from split1. This shouldn't 
cause the watermark to
+        // get bumped above maxEmittedWatermark, as split0 shouldn't be idle 
and it is still
+        // blocked.
+        assertThat(sourceReader.getPausedSplits()).containsExactly("0", "1");
+        assertThat(dataOutput.getEvents()).doNotHave(new 
WatermarkAbove(maxEmittedWatermark));
+    }
+
+    private SourceOperator<Integer, MockSourceSplit> 
createAndOpenSourceOperatorWithIdleness(
+            MockSourceReader sourceReader,
+            TestProcessingTimeService processingTimeService,
+            long idleTimeout)
+            throws Exception {
+
+        SourceOperator<Integer, MockSourceSplit> operator =
+                new TestingSourceOperator<>(
+                        sourceReader,
+                        WatermarkStrategy.forGenerator(ctx -> new 
TestWatermarkGenerator())
+                                .withTimestampAssigner((r, l) -> r)
+                                .withWatermarkAlignment("group-1", 
Duration.ofMillis(1))
+                                .withIdleness(Duration.ofMillis(idleTimeout)),
+                        processingTimeService,
+                        new MockOperatorEventGateway(),
+                        1,
+                        5,
+                        true);
+        Environment env = getTestingEnvironment();
+        operator.setup(
+                new SourceOperatorStreamTask<Integer>(env),
+                new MockStreamConfig(new Configuration(), 1),
+                new MockOutput<>(new ArrayList<>()));
+        operator.initializeState(new StreamTaskStateInitializerImpl(env, new 
MemoryStateBackend()));
+        operator.open();
+        return operator;
+    }
+
     private Environment getTestingEnvironment() {
         return new StreamMockEnvironment(
                 new Configuration(),
@@ -138,4 +278,31 @@ class SourceOperatorSplitWatermarkAlignmentTest {
             output.emitWatermark(new Watermark(maxWatermark));
         }
     }
+
+    /** Condition checking if there is no watermark above a certain value 
among StreamElements. */
+    public static class WatermarkAbove extends Condition<Object> {
+        public WatermarkAbove(int maxEmittedWatermark) {
+            super(
+                    event -> {
+                        if (!(event
+                                instanceof 
org.apache.flink.streaming.api.watermark.Watermark)) {
+                            return false;
+                        }
+                        org.apache.flink.streaming.api.watermark.Watermark w =
+                                
(org.apache.flink.streaming.api.watermark.Watermark) event;
+                        return w.getTimestamp() > maxEmittedWatermark;
+                    },
+                    "watermark value of greater than %d",
+                    maxEmittedWatermark);
+        }
+    }
+
+    /** Condition checking if there is any watermark among StreamElements. */
+    public static class AnyWatermark extends Condition<Object> {
+        public AnyWatermark() {
+            super(
+                    event -> event instanceof 
org.apache.flink.streaming.api.watermark.Watermark,
+                    "any watermark");
+        }
+    }
 }

Reply via email to