This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2be4355388f [FLINK-38454][core] Fix inconsistent idle behavior between 
StatusWatermarkValve and CombinedWatermarkStatus
2be4355388f is described below

commit 2be4355388fd75d1507cfd95740054914b567916
Author: Timo Walther <[email protected]>
AuthorDate: Wed Oct 1 15:50:30 2025 +0200

    [FLINK-38454][core] Fix inconsistent idle behavior between 
StatusWatermarkValve and CombinedWatermarkStatus
    
    This closes #27066.
---
 .../common/eventtime/CombinedWatermarkStatus.java  | 45 ++++++++++++--------
 .../eventtime/WatermarkOutputMultiplexer.java      |  8 +++-
 .../eventtime/WatermarkOutputMultiplexerTest.java  | 49 ++++++++++++++++++++++
 .../SourceOperatorSplitWatermarkAlignmentTest.java | 28 ++++++++++---
 4 files changed, 106 insertions(+), 24 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
index 9b76882736a..fb290ef957b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
@@ -23,8 +23,6 @@ import org.apache.flink.annotation.Internal;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link CombinedWatermarkStatus} combines the watermark (and idleness) 
updates of multiple
  * partitions/shards/splits into one combined watermark.
@@ -57,34 +55,51 @@ final class CombinedWatermarkStatus {
     }
 
     /**
-     * Checks whether we need to update the combined watermark.
+     * Checks whether we need to update the combined watermark. It can update 
{@link #isIdle()}
+     * status.
      *
-     * <p><b>NOTE:</b>It can update {@link #isIdle()} status.
+     * <p><b>NOTE:</b>The logic here should be kept in sync with {@code 
StatusWatermarkValve}.
      *
      * @return true, if the combined watermark changed
      */
     public boolean updateCombinedWatermark() {
-        long minimumOverAllOutputs = Long.MAX_VALUE;
-
-        // if we don't have any outputs minimumOverAllOutputs is not valid, 
it's still
-        // at its initial Long.MAX_VALUE state and we must not emit that
+        // if we don't have any outputs, we should not emit
         if (partialWatermarks.isEmpty()) {
             return false;
         }
 
+        long maximumOverAllOutputs = Long.MIN_VALUE;
+        long minimumOverAllActiveOutputs = Long.MAX_VALUE;
+
         boolean allIdle = true;
         for (PartialWatermark partialWatermark : partialWatermarks) {
+            final long watermark = partialWatermark.getWatermark();
+            maximumOverAllOutputs = Math.max(maximumOverAllOutputs, watermark);
             if (!partialWatermark.isIdle()) {
-                minimumOverAllOutputs =
-                        Math.min(minimumOverAllOutputs, 
partialWatermark.getWatermark());
+                minimumOverAllActiveOutputs = 
Math.min(minimumOverAllActiveOutputs, watermark);
                 allIdle = false;
             }
         }
 
         this.idle = allIdle;
 
-        if (!allIdle && minimumOverAllOutputs > combinedWatermark) {
-            combinedWatermark = minimumOverAllOutputs;
+        final long combinedWatermark;
+        if (allIdle) {
+            // If all splits are idle, we should flush all watermarks, which 
effectively
+            // means emitting the maximum watermark over all outputs.
+            // Otherwise, there could be a race condition between splits when 
idleness is triggered.
+            // E.g., split 1 of 2 emits 5 and goes into idle, split 2 of 2 
emits 4 and goes into
+            // idle. If split 2 is idle first, watermark 5 wins. If split 1 is 
idle first, watermark
+            // 4 wins. But if both are idle, we should conclude on 5.
+            combinedWatermark = maximumOverAllOutputs;
+        } else {
+            // Active splits should determine the progression of the 
watermark. Therefore, the
+            // minimum watermark across all active splits takes precedence 
over that of idle splits.
+            combinedWatermark = minimumOverAllActiveOutputs;
+        }
+
+        if (combinedWatermark > this.combinedWatermark) {
+            this.combinedWatermark = combinedWatermark;
             return true;
         }
 
@@ -102,12 +117,8 @@ final class CombinedWatermarkStatus {
             this.onWatermarkUpdate = onWatermarkUpdate;
         }
 
-        /**
-         * Returns the current watermark timestamp. This will throw {@link 
IllegalStateException} if
-         * the output is currently idle.
-         */
+        /** Returns the current watermark timestamp. */
         private long getWatermark() {
-            checkState(!idle, "Output is idle.");
             return watermark;
         }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
index 8a459db86b7..91a372722d7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
@@ -148,14 +148,18 @@ public class WatermarkOutputMultiplexer {
 
     /**
      * Checks whether we need to update the combined watermark. Should be 
called when a newly
-     * emitted per-output watermark is higher than the max so far or if we 
need to combined the
+     * emitted per-output watermark is higher than the max so far or if we 
need to combine the
      * deferred per-output updates.
+     *
+     * <p>It also handles scenarios where both emitting a watermark and 
entering the idle state
+     * occur within the same invocation.
      */
     private void updateCombinedWatermark() {
         if (combinedWatermarkStatus.updateCombinedWatermark()) {
             underlyingOutput.emitWatermark(
                     new 
Watermark(combinedWatermarkStatus.getCombinedWatermark()));
-        } else if (combinedWatermarkStatus.isIdle()) {
+        }
+        if (combinedWatermarkStatus.isIdle()) {
             underlyingOutput.markIdle();
         }
     }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
index eaf0794a88a..4ff83f9c24a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
@@ -109,6 +109,55 @@ class WatermarkOutputMultiplexerTest {
         assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new 
Watermark(5));
     }
 
+    @Test
+    void whenAllImmediateOutputsBecomeIdleWatermarkAdvances() {
+        TestingWatermarkOutput underlyingWatermarkOutput = 
createTestingWatermarkOutput();
+        WatermarkOutputMultiplexer multiplexer =
+                new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+
+        WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer);
+        WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer);
+
+        watermarkOutput1.emitWatermark(new Watermark(5));
+        watermarkOutput2.emitWatermark(new Watermark(2));
+
+        assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new 
Watermark(2));
+        assertThat(underlyingWatermarkOutput.isIdle()).isFalse();
+
+        // No race condition between watermarkOutput1 and watermarkOutput2.
+        // Even if watermarkOutput1 becomes idle first, the final result is 5.
+        watermarkOutput1.markIdle();
+        watermarkOutput2.markIdle();
+
+        assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new 
Watermark(5));
+        assertThat(underlyingWatermarkOutput.isIdle()).isTrue();
+    }
+
+    @Test
+    void whenAllDeferredOutputsEmitAndIdleWatermarkAdvances() {
+        TestingWatermarkOutput underlyingWatermarkOutput = 
createTestingWatermarkOutput();
+        WatermarkOutputMultiplexer multiplexer =
+                new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+
+        WatermarkOutput watermarkOutput1 = createDeferredOutput(multiplexer);
+        WatermarkOutput watermarkOutput2 = createDeferredOutput(multiplexer);
+
+        // Both emitting a watermark and becoming idle happen in the same 
invocation
+        watermarkOutput1.emitWatermark(new Watermark(5));
+        watermarkOutput1.markIdle();
+        // Both emitting a watermark and becoming idle happen in the same 
invocation
+        watermarkOutput2.emitWatermark(new Watermark(2));
+        watermarkOutput2.markIdle();
+
+        assertThat(underlyingWatermarkOutput.lastWatermark()).isNull();
+        assertThat(underlyingWatermarkOutput.isIdle()).isFalse();
+
+        multiplexer.onPeriodicEmit();
+
+        assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new 
Watermark(5));
+        assertThat(underlyingWatermarkOutput.isIdle()).isTrue();
+    }
+
     @Test
     void combinedWatermarkDoesNotRegressWhenIdleOutputRegresses() {
         TestingWatermarkOutput underlyingWatermarkOutput = 
createTestingWatermarkOutput();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 060689f0a55..8cc98109fdb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -145,13 +145,13 @@ class SourceOperatorSplitWatermarkAlignmentTest {
                 createAndOpenSourceOperatorWithIdleness(
                         sourceReader, processingTimeService, idleTimeout);
 
-        /**
-         * Intention behind this setup is that split0 emits a couple of 
records, while we keep
+        /*
+         * The intention behind this setup is that split0 emits a couple of 
records, while we keep
          * advancing processing time and keep firing timers. Normally split1 
would switch to idle
          * first (it hasn't emitted any records), which would cause a 
watermark from split0 to be
          * emitted and then WatermarkStatus.IDLE should be emitted after 
split0 also switches to
-         * idle. However we assert that neither watermark no idle status this 
doesn't happen due to
-         * the back pressure status.
+         * idle. However, we assert neither watermark nor idle status have 
been emitted; this
+         * doesn't happen due to the back pressure status.
          */
         MockSourceSplit split0 = new MockSourceSplit(0, 0, 
10).addRecord(42).addRecord(44);
         MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
@@ -191,7 +191,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         }
 
         assertThat(dataOutput.getEvents()).contains(WatermarkStatus.IDLE);
-        assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark());
+        assertThat(dataOutput.getEvents()).haveAtLeastOne(new WatermarkAt(44));
     }
 
     @ParameterizedTest
@@ -601,6 +601,24 @@ class SourceOperatorSplitWatermarkAlignmentTest {
         }
     }
 
+    /** Condition checking if there is a watermark matching a certain value 
among StreamElements. */
+    public static class WatermarkAt extends Condition<Object> {
+        public WatermarkAt(int emittedWatermark) {
+            super(
+                    event -> {
+                        if (!(event
+                                instanceof 
org.apache.flink.streaming.api.watermark.Watermark)) {
+                            return false;
+                        }
+                        org.apache.flink.streaming.api.watermark.Watermark w =
+                                
(org.apache.flink.streaming.api.watermark.Watermark) event;
+                        return w.getTimestamp() == emittedWatermark;
+                    },
+                    "watermark value of %d",
+                    emittedWatermark);
+        }
+    }
+
     /** Condition checking if there is any watermark among StreamElements. */
     public static class AnyWatermark extends Condition<Object> {
         public AnyWatermark() {

Reply via email to