This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2be4355388f [FLINK-38454][core] Fix inconsistent idle behavior between
StatusWatermarkValve and CombinedWatermarkStatus
2be4355388f is described below
commit 2be4355388fd75d1507cfd95740054914b567916
Author: Timo Walther <[email protected]>
AuthorDate: Wed Oct 1 15:50:30 2025 +0200
[FLINK-38454][core] Fix inconsistent idle behavior between
StatusWatermarkValve and CombinedWatermarkStatus
This closes #27066.
---
.../common/eventtime/CombinedWatermarkStatus.java | 45 ++++++++++++--------
.../eventtime/WatermarkOutputMultiplexer.java | 8 +++-
.../eventtime/WatermarkOutputMultiplexerTest.java | 49 ++++++++++++++++++++++
.../SourceOperatorSplitWatermarkAlignmentTest.java | 28 ++++++++++---
4 files changed, 106 insertions(+), 24 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
index 9b76882736a..fb290ef957b 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermarkStatus.java
@@ -23,8 +23,6 @@ import org.apache.flink.annotation.Internal;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* A {@link CombinedWatermarkStatus} combines the watermark (and idleness)
updates of multiple
* partitions/shards/splits into one combined watermark.
@@ -57,34 +55,51 @@ final class CombinedWatermarkStatus {
}
/**
- * Checks whether we need to update the combined watermark.
+ * Checks whether we need to update the combined watermark. It can update
{@link #isIdle()}
+ * status.
*
- * <p><b>NOTE:</b>It can update {@link #isIdle()} status.
+ * <p><b>NOTE:</b>The logic here should be kept in sync with {@code
StatusWatermarkValve}.
*
* @return true, if the combined watermark changed
*/
public boolean updateCombinedWatermark() {
- long minimumOverAllOutputs = Long.MAX_VALUE;
-
- // if we don't have any outputs minimumOverAllOutputs is not valid,
it's still
- // at its initial Long.MAX_VALUE state and we must not emit that
+ // if we don't have any outputs, we should not emit
if (partialWatermarks.isEmpty()) {
return false;
}
+ long maximumOverAllOutputs = Long.MIN_VALUE;
+ long minimumOverAllActiveOutputs = Long.MAX_VALUE;
+
boolean allIdle = true;
for (PartialWatermark partialWatermark : partialWatermarks) {
+ final long watermark = partialWatermark.getWatermark();
+ maximumOverAllOutputs = Math.max(maximumOverAllOutputs, watermark);
if (!partialWatermark.isIdle()) {
- minimumOverAllOutputs =
- Math.min(minimumOverAllOutputs,
partialWatermark.getWatermark());
+ minimumOverAllActiveOutputs =
Math.min(minimumOverAllActiveOutputs, watermark);
allIdle = false;
}
}
this.idle = allIdle;
- if (!allIdle && minimumOverAllOutputs > combinedWatermark) {
- combinedWatermark = minimumOverAllOutputs;
+ final long combinedWatermark;
+ if (allIdle) {
+ // If all splits are idle, we should flush all watermarks, which
effectively
+ // means emitting the maximum watermark over all outputs.
+ // Otherwise, there could be a race condition between splits when
idleness is triggered.
+ // E.g., split 1 of 2 emits 5 and goes into idle, split 2 of 2
emits 4 and goes into
+ // idle. If split 2 is idle first, watermark 5 wins. If split 1 is
idle first, watermark
+ // 4 wins. But if both are idle, we should conclude on 5.
+ combinedWatermark = maximumOverAllOutputs;
+ } else {
+ // Active splits should determine the progression of the
watermark. Therefore, the
+ // minimum watermark across all active splits takes precedence
over that of idle splits.
+ combinedWatermark = minimumOverAllActiveOutputs;
+ }
+
+ if (combinedWatermark > this.combinedWatermark) {
+ this.combinedWatermark = combinedWatermark;
return true;
}
@@ -102,12 +117,8 @@ final class CombinedWatermarkStatus {
this.onWatermarkUpdate = onWatermarkUpdate;
}
- /**
- * Returns the current watermark timestamp. This will throw {@link
IllegalStateException} if
- * the output is currently idle.
- */
+ /** Returns the current watermark timestamp. */
private long getWatermark() {
- checkState(!idle, "Output is idle.");
return watermark;
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
index 8a459db86b7..91a372722d7 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
@@ -148,14 +148,18 @@ public class WatermarkOutputMultiplexer {
/**
* Checks whether we need to update the combined watermark. Should be
called when a newly
- * emitted per-output watermark is higher than the max so far or if we
need to combined the
+ * emitted per-output watermark is higher than the max so far or if we
need to combine the
* deferred per-output updates.
+ *
+ * <p>It also handles scenarios where both emitting a watermark and
entering the idle state
+ * occur within the same invocation.
*/
private void updateCombinedWatermark() {
if (combinedWatermarkStatus.updateCombinedWatermark()) {
underlyingOutput.emitWatermark(
new
Watermark(combinedWatermarkStatus.getCombinedWatermark()));
- } else if (combinedWatermarkStatus.isIdle()) {
+ }
+ if (combinedWatermarkStatus.isIdle()) {
underlyingOutput.markIdle();
}
}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
index eaf0794a88a..4ff83f9c24a 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
@@ -109,6 +109,55 @@ class WatermarkOutputMultiplexerTest {
assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new
Watermark(5));
}
+ @Test
+ void whenAllImmediateOutputsBecomeIdleWatermarkAdvances() {
+ TestingWatermarkOutput underlyingWatermarkOutput =
createTestingWatermarkOutput();
+ WatermarkOutputMultiplexer multiplexer =
+ new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+
+ WatermarkOutput watermarkOutput1 = createImmediateOutput(multiplexer);
+ WatermarkOutput watermarkOutput2 = createImmediateOutput(multiplexer);
+
+ watermarkOutput1.emitWatermark(new Watermark(5));
+ watermarkOutput2.emitWatermark(new Watermark(2));
+
+ assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new
Watermark(2));
+ assertThat(underlyingWatermarkOutput.isIdle()).isFalse();
+
+ // No race condition between watermarkOutput1 and watermarkOutput2.
+ // Even if watermarkOutput1 becomes idle first, the final result is 5.
+ watermarkOutput1.markIdle();
+ watermarkOutput2.markIdle();
+
+ assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new
Watermark(5));
+ assertThat(underlyingWatermarkOutput.isIdle()).isTrue();
+ }
+
+ @Test
+ void whenAllDeferredOutputsEmitAndIdleWatermarkAdvances() {
+ TestingWatermarkOutput underlyingWatermarkOutput =
createTestingWatermarkOutput();
+ WatermarkOutputMultiplexer multiplexer =
+ new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+
+ WatermarkOutput watermarkOutput1 = createDeferredOutput(multiplexer);
+ WatermarkOutput watermarkOutput2 = createDeferredOutput(multiplexer);
+
+ // Both emitting a watermark and becoming idle happen in the same
invocation
+ watermarkOutput1.emitWatermark(new Watermark(5));
+ watermarkOutput1.markIdle();
+ // Both emitting a watermark and becoming idle happen in the same
invocation
+ watermarkOutput2.emitWatermark(new Watermark(2));
+ watermarkOutput2.markIdle();
+
+ assertThat(underlyingWatermarkOutput.lastWatermark()).isNull();
+ assertThat(underlyingWatermarkOutput.isIdle()).isFalse();
+
+ multiplexer.onPeriodicEmit();
+
+ assertThat(underlyingWatermarkOutput.lastWatermark()).isEqualTo(new
Watermark(5));
+ assertThat(underlyingWatermarkOutput.isIdle()).isTrue();
+ }
+
@Test
void combinedWatermarkDoesNotRegressWhenIdleOutputRegresses() {
TestingWatermarkOutput underlyingWatermarkOutput =
createTestingWatermarkOutput();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 060689f0a55..8cc98109fdb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -145,13 +145,13 @@ class SourceOperatorSplitWatermarkAlignmentTest {
createAndOpenSourceOperatorWithIdleness(
sourceReader, processingTimeService, idleTimeout);
- /**
- * Intention behind this setup is that split0 emits a couple of
records, while we keep
+ /*
+ * The intention behind this setup is that split0 emits a couple of
records, while we keep
* advancing processing time and keep firing timers. Normally split1
would switch to idle
* first (it hasn't emitted any records), which would cause a
watermark from split0 to be
* emitted and then WatermarkStatus.IDLE should be emitted after
split0 also switches to
- * idle. However we assert that neither watermark no idle status this
doesn't happen due to
- * the back pressure status.
+ * idle. However, we assert neither watermark nor idle status have
been emitted; this
+ * doesn't happen due to the back pressure status.
*/
MockSourceSplit split0 = new MockSourceSplit(0, 0,
10).addRecord(42).addRecord(44);
MockSourceSplit split1 = new MockSourceSplit(1, 10, 20);
@@ -191,7 +191,7 @@ class SourceOperatorSplitWatermarkAlignmentTest {
}
assertThat(dataOutput.getEvents()).contains(WatermarkStatus.IDLE);
- assertThat(dataOutput.getEvents()).doNotHave(new AnyWatermark());
+ assertThat(dataOutput.getEvents()).haveAtLeastOne(new WatermarkAt(44));
}
@ParameterizedTest
@@ -601,6 +601,24 @@ class SourceOperatorSplitWatermarkAlignmentTest {
}
}
+ /** Condition checking if there is a watermark matching a certain value
among StreamElements. */
+ public static class WatermarkAt extends Condition<Object> {
+ public WatermarkAt(int emittedWatermark) {
+ super(
+ event -> {
+ if (!(event
+ instanceof
org.apache.flink.streaming.api.watermark.Watermark)) {
+ return false;
+ }
+ org.apache.flink.streaming.api.watermark.Watermark w =
+
(org.apache.flink.streaming.api.watermark.Watermark) event;
+ return w.getTimestamp() == emittedWatermark;
+ },
+ "watermark value of %d",
+ emittedWatermark);
+ }
+ }
+
/** Condition checking if there is any watermark among StreamElements. */
public static class AnyWatermark extends Condition<Object> {
public AnyWatermark() {