This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b3b709f46ca [FLINK-35885][table] Prohibit advancing the progress of
processing time window through watermark
b3b709f46ca is described below
commit b3b709f46ca7b12d9a26192a60cdde790d8523b9
Author: Xuyang <[email protected]>
AuthorDate: Wed Aug 7 23:26:11 2024 +0800
[FLINK-35885][table] Prohibit advancing the progress of processing time
window through watermark
This closes #25119
---
.../nodes/exec/stream/StreamExecWindowRank.java | 1 +
.../harness/WindowAggregateHarnessTest.scala | 46 ++++++++++++++++++++++
.../aggregate/window/WindowAggOperatorBuilder.java | 2 +-
.../RowTimeWindowDeduplicateOperatorBuilder.java | 2 +-
.../rank/window/WindowRankOperatorBuilder.java | 11 +++++-
.../window/tvf/common/WindowAggOperator.java | 27 ++++++++-----
.../rank/window/WindowRankOperatorTest.java | 3 ++
7 files changed, 79 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
index 65471465923..7be4ca61b11 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
@@ -252,6 +252,7 @@ public class StreamExecWindowRank extends
ExecNodeBase<RowData>
.rankStart(constantRankRange.getRankStart())
.rankEnd(constantRankRange.getRankEnd())
.windowEndIndex(windowEndIndex)
+ .withEventTime(windowing.isRowtime())
.build();
OneInputTransformation<RowData, RowData> transform =
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
index a4c489cffed..5c5b7100132 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.runtime.harness
import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.table.api._
@@ -687,6 +688,51 @@ class WindowAggregateHarnessTest(backend:
StateBackendMode, shiftTimeZone: ZoneI
testHarness1.close()
}
+ @TestTemplate
+ def testProcessingTimeTumbleWindowWithFutureWatermark(): Unit = {
+ val (testHarness, outputTypes) =
+ createProcessingTimeWindowOperator(TUMBLE, isCDCSource = false)
+ val assertor = new RowDataHarnessAssertor(outputTypes)
+
+ testHarness.open()
+
+ // mock a large watermark arrives before proctime
+ testHarness.processWatermark(10000L)
+
+ testHarness.setProcessingTime(1000L)
+ testHarness.processElement(insertRecord("a", 1d, "str1", null))
+ testHarness.setProcessingTime(2000L)
+ testHarness.processElement(insertRecord("a", 2d, "str2", null))
+ testHarness.setProcessingTime(3000L)
+ testHarness.processElement(insertRecord("a", 3d, "str2", null))
+
+ testHarness.setProcessingTime(6000L)
+ testHarness.processElement(insertRecord("a", 4d, "str1", null))
+
+ testHarness.setProcessingTime(50000L)
+
+ val expected = new ConcurrentLinkedQueue[Object]()
+ expected.add(new Watermark(10000L))
+ expected.add(
+ insertRecord(
+ "a",
+ 3L,
+ 3.0d,
+ 2L,
+ localMills("1970-01-01T00:00:00"),
+ localMills("1970-01-01T00:00:05")))
+ expected.add(
+ insertRecord(
+ "a",
+ 1L,
+ 4.0d,
+ 1L,
+ localMills("1970-01-01T00:00:05"),
+ localMills("1970-01-01T00:00:10")))
+ assertor.assertOutputEqualsSorted("result mismatch", expected,
testHarness.getOutput)
+ testHarness.close()
+ }
+
private def createProcessingTimeWindowOperator(testWindow: String,
isCDCSource: Boolean)
: (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData],
Array[LogicalType]) = {
val windowDDL = testWindow match {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
index 77494b721e0..be368f5e6f9 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
@@ -153,7 +153,7 @@ public class WindowAggOperatorBuilder {
} else {
windowProcessor = buildUnslicingWindowProcessor();
}
- return new WindowAggOperator<>(windowProcessor);
+ return new WindowAggOperator<>(windowProcessor,
assigner.isEventTime());
}
@SuppressWarnings("unchecked")
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
index 16f4228f3c8..ce152cca741 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
@@ -108,6 +108,6 @@ public class RowTimeWindowDeduplicateOperatorBuilder {
final SlicingWindowProcessor<Long> windowProcessor =
new RowTimeWindowDeduplicateProcessor(
inputSerializer, bufferFactory, windowEndIndex,
shiftTimeZone);
- return new WindowAggOperator<>(windowProcessor);
+ return new WindowAggOperator<>(windowProcessor, true);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
index cf0b8f68d76..807197ae076 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
@@ -50,6 +50,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* .rankStart(0)
* .rankEnd(100)
* .windowEndIndex(windowEndIndex)
+ * .withEventTime(true)
* .build();
* </pre>
*/
@@ -68,6 +69,7 @@ public class WindowRankOperatorBuilder {
private long rankEnd = -1;
private int windowEndIndex = -1;
private ZoneId shiftTimeZone;
+ private Boolean isEventTime;
public WindowRankOperatorBuilder inputSerializer(
AbstractRowDataSerializer<RowData> inputSerializer) {
@@ -116,11 +118,17 @@ public class WindowRankOperatorBuilder {
return this;
}
+ public WindowRankOperatorBuilder withEventTime(Boolean isEventTime) {
+ this.isEventTime = isEventTime;
+ return this;
+ }
+
public WindowAggOperator<RowData, ?> build() {
checkNotNull(inputSerializer);
checkNotNull(keySerializer);
checkNotNull(sortKeySelector);
checkNotNull(generatedSortKeyComparator);
+ checkNotNull(isEventTime);
checkArgument(
rankStart > 0,
String.format("Illegal rank start %s, it should be positive!",
rankStart));
@@ -152,6 +160,7 @@ public class WindowRankOperatorBuilder {
outputRankNumber,
windowEndIndex,
shiftTimeZone);
- return new WindowAggOperator<>(windowProcessor);
+ // Processing time Window TopN is not supported yet.
+ return new WindowAggOperator<>(windowProcessor, isEventTime);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
index de797ed934a..b94c1b0f806 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
@@ -103,32 +103,35 @@ public final class WindowAggOperator<K, W> extends
TableStreamOperator<RowData>
private static final String WATERMARK_LATENCY_METRIC_NAME =
"watermarkLatency";
/** The concrete window operator implementation. */
- protected final WindowProcessor<W> windowProcessor;
+ private final WindowProcessor<W> windowProcessor;
+
+ private final boolean isEventTime;
// ------------------------------------------------------------------------
/** This is used for emitting elements with a given timestamp. */
- protected transient TimestampedCollector<RowData> collector;
+ private transient TimestampedCollector<RowData> collector;
/** The service to register timers. */
- protected transient InternalTimerService<W> internalTimerService;
+ private transient InternalTimerService<W> internalTimerService;
/** The tracked processing time triggered last time. */
- protected transient long lastTriggeredProcessingTime;
+ private transient long lastTriggeredProcessingTime;
/** The operator state to store watermark. */
- protected transient ListState<Long> watermarkState;
+ private transient ListState<Long> watermarkState;
// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------
- protected transient Counter numLateRecordsDropped;
- protected transient Meter lateRecordsDroppedRate;
- protected transient Gauge<Long> watermarkLatency;
+ private transient Counter numLateRecordsDropped;
+ private transient Meter lateRecordsDroppedRate;
+ private transient Gauge<Long> watermarkLatency;
- public WindowAggOperator(WindowProcessor<W> windowProcessor) {
+ public WindowAggOperator(WindowProcessor<W> windowProcessor, boolean
isEventTime) {
this.windowProcessor = windowProcessor;
+ this.isEventTime = isEventTime;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -222,7 +225,11 @@ public final class WindowAggOperator<K, W> extends
TableStreamOperator<RowData>
@Override
public void processWatermark(Watermark mark) throws Exception {
if (mark.getTimestamp() > currentWatermark) {
- windowProcessor.advanceProgress(mark.getTimestamp());
+ // If this is a proctime window, progress should not be advanced
by watermark, or it'll
+ // disturb timer-based processing
+ if (isEventTime) {
+ windowProcessor.advanceProgress(mark.getTimestamp());
+ }
super.processWatermark(mark);
} else {
super.processWatermark(new Watermark(currentWatermark));
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
index 95f8717a998..2668e9bb904 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
@@ -151,6 +151,7 @@ public class WindowRankOperatorTest {
.rankStart(1)
.rankEnd(2)
.windowEndIndex(WINDOW_END_INDEX)
+ .withEventTime(true)
.build();
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -257,6 +258,7 @@ public class WindowRankOperatorTest {
.rankStart(2)
.rankEnd(2)
.windowEndIndex(WINDOW_END_INDEX)
+ .withEventTime(true)
.build();
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -346,6 +348,7 @@ public class WindowRankOperatorTest {
.rankStart(1)
.rankEnd(2)
.windowEndIndex(WINDOW_END_INDEX)
+ .withEventTime(true)
.build();
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =