This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 08649fd6981 [FLINK-35885][table] Prohibit advancing the progress of 
processing time window through watermark
08649fd6981 is described below

commit 08649fd6981655bcd131c76ef36f6dd074566dd0
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Fri Aug 9 16:10:29 2024 +0800

    [FLINK-35885][table] Prohibit advancing the progress of processing time 
window through watermark
    
    This closes #25180
---
 .../nodes/exec/stream/StreamExecWindowRank.java    |  1 +
 .../harness/WindowAggregateHarnessTest.scala       | 46 ++++++++++++++++++++++
 .../aggregate/window/WindowAggOperatorBuilder.java |  2 +-
 .../RowTimeWindowDeduplicateOperatorBuilder.java   |  2 +-
 .../rank/window/WindowRankOperatorBuilder.java     | 11 +++++-
 .../window/tvf/common/WindowAggOperator.java       | 27 ++++++++-----
 .../rank/window/WindowRankOperatorTest.java        |  3 ++
 7 files changed, 79 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
index 65471465923..7be4ca61b11 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowRank.java
@@ -252,6 +252,7 @@ public class StreamExecWindowRank extends 
ExecNodeBase<RowData>
                         .rankStart(constantRankRange.getRankStart())
                         .rankEnd(constantRankRange.getRankEnd())
                         .windowEndIndex(windowEndIndex)
+                        .withEventTime(windowing.isRowtime())
                         .build();
 
         OneInputTransformation<RowData, RowData> transform =
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
index b33303ba22a..f46a992592f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/WindowAggregateHarnessTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.runtime.harness
 
 import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
 import org.apache.flink.table.api._
@@ -685,6 +686,51 @@ class WindowAggregateHarnessTest(backend: 
StateBackendMode, shiftTimeZone: ZoneI
     testHarness1.close()
   }
 
+  @TestTemplate
+  def testProcessingTimeTumbleWindowWithFutureWatermark(): Unit = {
+    val (testHarness, outputTypes) =
+      createProcessingTimeWindowOperator(TUMBLE, isCDCSource = false)
+    val assertor = new RowDataHarnessAssertor(outputTypes)
+
+    testHarness.open()
+
+    // mock a large watermark arrives before proctime
+    testHarness.processWatermark(10000L)
+
+    testHarness.setProcessingTime(1000L)
+    testHarness.processElement(insertRecord("a", 1d, "str1", null))
+    testHarness.setProcessingTime(2000L)
+    testHarness.processElement(insertRecord("a", 2d, "str2", null))
+    testHarness.setProcessingTime(3000L)
+    testHarness.processElement(insertRecord("a", 3d, "str2", null))
+
+    testHarness.setProcessingTime(6000L)
+    testHarness.processElement(insertRecord("a", 4d, "str1", null))
+
+    testHarness.setProcessingTime(50000L)
+
+    val expected = new ConcurrentLinkedQueue[Object]()
+    expected.add(new Watermark(10000L))
+    expected.add(
+      insertRecord(
+        "a",
+        3L,
+        3.0d,
+        2L,
+        localMills("1970-01-01T00:00:00"),
+        localMills("1970-01-01T00:00:05")))
+    expected.add(
+      insertRecord(
+        "a",
+        1L,
+        4.0d,
+        1L,
+        localMills("1970-01-01T00:00:05"),
+        localMills("1970-01-01T00:00:10")))
+    assertor.assertOutputEqualsSorted("result mismatch", expected, 
testHarness.getOutput)
+    testHarness.close()
+  }
+
   private def createProcessingTimeWindowOperator(testWindow: String, 
isCDCSource: Boolean)
       : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
Array[LogicalType]) = {
     val windowDDL = testWindow match {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
index 77494b721e0..be368f5e6f9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java
@@ -153,7 +153,7 @@ public class WindowAggOperatorBuilder {
         } else {
             windowProcessor = buildUnslicingWindowProcessor();
         }
-        return new WindowAggOperator<>(windowProcessor);
+        return new WindowAggOperator<>(windowProcessor, 
assigner.isEventTime());
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
index 16f4228f3c8..ce152cca741 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java
@@ -108,6 +108,6 @@ public class RowTimeWindowDeduplicateOperatorBuilder {
         final SlicingWindowProcessor<Long> windowProcessor =
                 new RowTimeWindowDeduplicateProcessor(
                         inputSerializer, bufferFactory, windowEndIndex, 
shiftTimeZone);
-        return new WindowAggOperator<>(windowProcessor);
+        return new WindowAggOperator<>(windowProcessor, true);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
index cf0b8f68d76..807197ae076 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java
@@ -50,6 +50,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *   .rankStart(0)
  *   .rankEnd(100)
  *   .windowEndIndex(windowEndIndex)
+ *   .withEventTime(true)
  *   .build();
  * </pre>
  */
@@ -68,6 +69,7 @@ public class WindowRankOperatorBuilder {
     private long rankEnd = -1;
     private int windowEndIndex = -1;
     private ZoneId shiftTimeZone;
+    private Boolean isEventTime;
 
     public WindowRankOperatorBuilder inputSerializer(
             AbstractRowDataSerializer<RowData> inputSerializer) {
@@ -116,11 +118,17 @@ public class WindowRankOperatorBuilder {
         return this;
     }
 
+    public WindowRankOperatorBuilder withEventTime(Boolean isEventTime) {
+        this.isEventTime = isEventTime;
+        return this;
+    }
+
     public WindowAggOperator<RowData, ?> build() {
         checkNotNull(inputSerializer);
         checkNotNull(keySerializer);
         checkNotNull(sortKeySelector);
         checkNotNull(generatedSortKeyComparator);
+        checkNotNull(isEventTime);
         checkArgument(
                 rankStart > 0,
                 String.format("Illegal rank start %s, it should be positive!", 
rankStart));
@@ -152,6 +160,7 @@ public class WindowRankOperatorBuilder {
                         outputRankNumber,
                         windowEndIndex,
                         shiftTimeZone);
-        return new WindowAggOperator<>(windowProcessor);
+        // Processing time Window TopN is not supported yet.
+        return new WindowAggOperator<>(windowProcessor, isEventTime);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
index de797ed934a..b94c1b0f806 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
@@ -103,32 +103,35 @@ public final class WindowAggOperator<K, W> extends 
TableStreamOperator<RowData>
     private static final String WATERMARK_LATENCY_METRIC_NAME = 
"watermarkLatency";
 
     /** The concrete window operator implementation. */
-    protected final WindowProcessor<W> windowProcessor;
+    private final WindowProcessor<W> windowProcessor;
+
+    private final boolean isEventTime;
 
     // ------------------------------------------------------------------------
 
     /** This is used for emitting elements with a given timestamp. */
-    protected transient TimestampedCollector<RowData> collector;
+    private transient TimestampedCollector<RowData> collector;
 
     /** The service to register timers. */
-    protected transient InternalTimerService<W> internalTimerService;
+    private transient InternalTimerService<W> internalTimerService;
 
     /** The tracked processing time triggered last time. */
-    protected transient long lastTriggeredProcessingTime;
+    private transient long lastTriggeredProcessingTime;
 
     /** The operator state to store watermark. */
-    protected transient ListState<Long> watermarkState;
+    private transient ListState<Long> watermarkState;
 
     // ------------------------------------------------------------------------
     // Metrics
     // ------------------------------------------------------------------------
 
-    protected transient Counter numLateRecordsDropped;
-    protected transient Meter lateRecordsDroppedRate;
-    protected transient Gauge<Long> watermarkLatency;
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
 
-    public WindowAggOperator(WindowProcessor<W> windowProcessor) {
+    public WindowAggOperator(WindowProcessor<W> windowProcessor, boolean 
isEventTime) {
         this.windowProcessor = windowProcessor;
+        this.isEventTime = isEventTime;
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
@@ -222,7 +225,11 @@ public final class WindowAggOperator<K, W> extends 
TableStreamOperator<RowData>
     @Override
     public void processWatermark(Watermark mark) throws Exception {
         if (mark.getTimestamp() > currentWatermark) {
-            windowProcessor.advanceProgress(mark.getTimestamp());
+            // If this is a proctime window, progress should not be advanced 
by watermark, or it'll
+            // disturb timer-based processing
+            if (isEventTime) {
+                windowProcessor.advanceProgress(mark.getTimestamp());
+            }
             super.processWatermark(mark);
         } else {
             super.processWatermark(new Watermark(currentWatermark));
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
index 95f8717a998..2668e9bb904 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java
@@ -151,6 +151,7 @@ public class WindowRankOperatorTest {
                         .rankStart(1)
                         .rankEnd(2)
                         .windowEndIndex(WINDOW_END_INDEX)
+                        .withEventTime(true)
                         .build();
 
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -257,6 +258,7 @@ public class WindowRankOperatorTest {
                         .rankStart(2)
                         .rankEnd(2)
                         .windowEndIndex(WINDOW_END_INDEX)
+                        .withEventTime(true)
                         .build();
 
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
@@ -346,6 +348,7 @@ public class WindowRankOperatorTest {
                         .rankStart(1)
                         .rankEnd(2)
                         .windowEndIndex(WINDOW_END_INDEX)
+                        .withEventTime(true)
                         .build();
 
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =

Reply via email to