This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 08649fd6981 [FLINK-35885][table] Prohibit advancing the progress of processing time window through watermark 08649fd6981 is described below commit 08649fd6981655bcd131c76ef36f6dd074566dd0 Author: Xuyang <xyzhong...@163.com> AuthorDate: Fri Aug 9 16:10:29 2024 +0800 [FLINK-35885][table] Prohibit advancing the progress of processing time window through watermark This closes #25180 --- .../nodes/exec/stream/StreamExecWindowRank.java | 1 + .../harness/WindowAggregateHarnessTest.scala | 46 ++++++++++++++++++++++ .../aggregate/window/WindowAggOperatorBuilder.java | 2 +- .../RowTimeWindowDeduplicateOperatorBuilder.java | 2 +- .../rank/window/WindowRankOperatorBuilder.java | 11 +++++- .../window/tvf/common/WindowAggOperator.java | 27 ++++++++----- .../rank/window/WindowRankOperatorTest.java | 3 ++ 7 files changed, 79 insertions(+), 13 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java index 65471465923..7be4ca61b11 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java @@ -252,6 +252,7 @@ public class StreamExecWindowRank extends ExecNodeBase<RowData> .rankStart(constantRankRange.getRankStart()) .rankEnd(constantRankRange.getRankEnd()) .windowEndIndex(windowEndIndex) + .withEventTime(windowing.isRowtime()) .build(); OneInputTransformation<RowData, RowData> transform = diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala index b33303ba22a..f46a992592f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.runtime.harness import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness import org.apache.flink.table.api._ @@ -685,6 +686,51 @@ class WindowAggregateHarnessTest(backend: StateBackendMode, shiftTimeZone: ZoneI testHarness1.close() } + @TestTemplate + def testProcessingTimeTumbleWindowWithFutureWatermark(): Unit = { + val (testHarness, outputTypes) = + createProcessingTimeWindowOperator(TUMBLE, isCDCSource = false) + val assertor = new RowDataHarnessAssertor(outputTypes) + + testHarness.open() + + // mock a large watermark arrives before proctime + testHarness.processWatermark(10000L) + + testHarness.setProcessingTime(1000L) + testHarness.processElement(insertRecord("a", 1d, "str1", null)) + testHarness.setProcessingTime(2000L) + testHarness.processElement(insertRecord("a", 2d, "str2", null)) + testHarness.setProcessingTime(3000L) + testHarness.processElement(insertRecord("a", 3d, "str2", null)) + + testHarness.setProcessingTime(6000L) + testHarness.processElement(insertRecord("a", 4d, "str1", null)) + + testHarness.setProcessingTime(50000L) + + val expected = new ConcurrentLinkedQueue[Object]() + expected.add(new Watermark(10000L)) + expected.add( + insertRecord( + "a", + 3L, + 3.0d, + 2L, + localMills("1970-01-01T00:00:00"), + localMills("1970-01-01T00:00:05"))) + expected.add( + insertRecord( + "a", + 1L, + 4.0d, + 1L, + localMills("1970-01-01T00:00:05"), + localMills("1970-01-01T00:00:10"))) + assertor.assertOutputEqualsSorted("result mismatch", expected, testHarness.getOutput) + testHarness.close() + } + private def createProcessingTimeWindowOperator(testWindow: String, isCDCSource: Boolean) : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = { val windowDDL = testWindow match { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java index 77494b721e0..be368f5e6f9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java @@ -153,7 +153,7 @@ public class WindowAggOperatorBuilder { } else { windowProcessor = buildUnslicingWindowProcessor(); } - return new WindowAggOperator<>(windowProcessor); + return new WindowAggOperator<>(windowProcessor, assigner.isEventTime()); } @SuppressWarnings("unchecked") diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java index 16f4228f3c8..ce152cca741 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java @@ -108,6 +108,6 @@ public class RowTimeWindowDeduplicateOperatorBuilder { final SlicingWindowProcessor<Long> windowProcessor = new RowTimeWindowDeduplicateProcessor( inputSerializer, bufferFactory, windowEndIndex, shiftTimeZone); - return new WindowAggOperator<>(windowProcessor); + return new WindowAggOperator<>(windowProcessor, true); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java index cf0b8f68d76..807197ae076 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java @@ -50,6 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * .rankStart(0) * .rankEnd(100) * .windowEndIndex(windowEndIndex) + * .withEventTime(true) * .build(); * </pre> */ @@ -68,6 +69,7 @@ public class WindowRankOperatorBuilder { private long rankEnd = -1; private int windowEndIndex = -1; private ZoneId shiftTimeZone; + private Boolean isEventTime; public WindowRankOperatorBuilder inputSerializer( AbstractRowDataSerializer<RowData> inputSerializer) { @@ -116,11 +118,17 @@ public class WindowRankOperatorBuilder { return this; } + public WindowRankOperatorBuilder withEventTime(Boolean isEventTime) { + this.isEventTime = isEventTime; + return this; + } + public WindowAggOperator<RowData, ?> build() { checkNotNull(inputSerializer); checkNotNull(keySerializer); checkNotNull(sortKeySelector); checkNotNull(generatedSortKeyComparator); + checkNotNull(isEventTime); checkArgument( rankStart > 0, String.format("Illegal rank start %s, it should be positive!", rankStart)); @@ -152,6 +160,7 @@ public class WindowRankOperatorBuilder { outputRankNumber, windowEndIndex, shiftTimeZone); - return new WindowAggOperator<>(windowProcessor); + // Processing time Window TopN is not supported yet. + return new WindowAggOperator<>(windowProcessor, isEventTime); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java index de797ed934a..b94c1b0f806 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java @@ -103,32 +103,35 @@ public final class WindowAggOperator<K, W> extends TableStreamOperator<RowData> private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency"; /** The concrete window operator implementation. */ - protected final WindowProcessor<W> windowProcessor; + private final WindowProcessor<W> windowProcessor; + + private final boolean isEventTime; // ------------------------------------------------------------------------ /** This is used for emitting elements with a given timestamp. */ - protected transient TimestampedCollector<RowData> collector; + private transient TimestampedCollector<RowData> collector; /** The service to register timers. */ - protected transient InternalTimerService<W> internalTimerService; + private transient InternalTimerService<W> internalTimerService; /** The tracked processing time triggered last time. */ - protected transient long lastTriggeredProcessingTime; + private transient long lastTriggeredProcessingTime; /** The operator state to store watermark. */ - protected transient ListState<Long> watermarkState; + private transient ListState<Long> watermarkState; // ------------------------------------------------------------------------ // Metrics // ------------------------------------------------------------------------ - protected transient Counter numLateRecordsDropped; - protected transient Meter lateRecordsDroppedRate; - protected transient Gauge<Long> watermarkLatency; + private transient Counter numLateRecordsDropped; + private transient Meter lateRecordsDroppedRate; + private transient Gauge<Long> watermarkLatency; - public WindowAggOperator(WindowProcessor<W> windowProcessor) { + public WindowAggOperator(WindowProcessor<W> windowProcessor, boolean isEventTime) { this.windowProcessor = windowProcessor; + this.isEventTime = isEventTime; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -222,7 +225,11 @@ public final class WindowAggOperator<K, W> extends TableStreamOperator<RowData> @Override public void processWatermark(Watermark mark) throws Exception { if (mark.getTimestamp() > currentWatermark) { - windowProcessor.advanceProgress(mark.getTimestamp()); + // If this is a proctime window, progress should not be advanced by watermark, or it'll + // disturb timer-based processing + if (isEventTime) { + windowProcessor.advanceProgress(mark.getTimestamp()); + } super.processWatermark(mark); } else { super.processWatermark(new Watermark(currentWatermark)); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java index 95f8717a998..2668e9bb904 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java @@ -151,6 +151,7 @@ public class WindowRankOperatorTest { .rankStart(1) .rankEnd(2) .windowEndIndex(WINDOW_END_INDEX) + .withEventTime(true) .build(); OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = @@ -257,6 +258,7 @@ public class WindowRankOperatorTest { .rankStart(2) .rankEnd(2) .windowEndIndex(WINDOW_END_INDEX) + .withEventTime(true) .build(); OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = @@ -346,6 +348,7 @@ public class WindowRankOperatorTest { .rankStart(1) .rankEnd(2) .windowEndIndex(WINDOW_END_INDEX) + .withEventTime(true) .build(); OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =