This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b7a186b4553 [FLINK-38998][table] Fix cascaded event time 
WindowAggregate does not produce data when offset is specified
b7a186b4553 is described below

commit b7a186b4553599477a1c7dc60701d59110fafb3b
Author: Sergey Kononov <[email protected]>
AuthorDate: Mon Mar 16 05:46:38 2026 +0700

    [FLINK-38998][table] Fix cascaded event time WindowAggregate does not 
produce data when offset is specified
---
 .../runtime/stream/sql/WindowAggregateITCase.scala |  9 ++++++--
 .../AbstractAsyncStateSliceWindowAggProcessor.java | 10 ++++++---
 .../window/LocalSlicingWindowAggOperator.java      | 10 ++++++---
 .../AbstractSliceSyncStateWindowAggProcessor.java  | 10 ++++++---
 .../window/tvf/slicing/SliceAssigner.java          |  3 +++
 .../window/tvf/slicing/SliceAssigners.java         | 25 ++++++++++++++++++++++
 .../flink/table/runtime/util/TimeWindowUtil.java   | 12 +++++++----
 7 files changed, 64 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 8485d1f418e..12ac1f73680 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -258,7 +258,7 @@ class WindowAggregateITCase(
         |    window_end,
         |    COUNT(DISTINCT `string`) AS cnt
         |    FROM TABLE(
-        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '1' DAY, 
INTERVAL '8' HOUR))
+        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '30' SECONDS, 
INTERVAL '15' SECONDS))
         |    GROUP BY `name`, window_start, window_end
         |) GROUP BY cnt, window_start, window_end
       """.stripMargin
@@ -268,7 +268,12 @@ class WindowAggregateITCase(
     env.execute()
 
     val expected =
-      Seq("0,2020-10-09T08:00,2020-10-10T08:00,1", 
"3,2020-10-09T08:00,2020-10-10T08:00,2")
+      Seq(
+        "2,2020-10-09T23:59:45,2020-10-10T00:00:15,1",
+        "3,2020-10-09T23:59:45,2020-10-10T00:00:15,1",
+        "2,2020-10-10T00:00:15,2020-10-10T00:00:45,1",
+        "0,2020-10-10T00:00:15,2020-10-10T00:00:45,1"
+      )
     assertThat(sink.getAppendResults.sorted.mkString("\n"))
       .isEqualTo(expected.sorted.mkString("\n"))
   }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
index cca05999ae3..9934ae46078 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
@@ -41,7 +41,7 @@ import java.util.List;
 
 import static 
org.apache.flink.table.runtime.util.AsyncStateUtils.REUSABLE_TRUE_STATE_FUTURE;
 import static 
org.apache.flink.table.runtime.util.AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
-import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermark;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset;
 import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
 
 /** A base implementation of {@link AsyncStateSlicingWindowProcessor} for 
window aggregate. */
@@ -168,8 +168,12 @@ public abstract class 
AbstractAsyncStateSliceWindowAggProcessor
                 // they will register small timers and normal watermark will 
flush the buffer
                 advanceFuture = windowBuffer.advanceProgress(currentKey, 
currentProgress);
                 nextTriggerProgress =
-                        getNextTriggerWatermark(
-                                currentProgress, windowInterval, 
shiftTimeZone, useDayLightSaving);
+                        getNextTriggerWatermarkWithOffset(
+                                currentProgress,
+                                windowInterval,
+                                sliceAssigner.getWindowOffset(),
+                                shiftTimeZone,
+                                useDayLightSaving);
             }
         }
         return advanceFuture;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index 5086ad1451a..ed96794c7fd 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner
 import java.time.ZoneId;
 import java.util.TimeZone;
 
-import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermark;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset;
 
 /**
  * The operator used for local window aggregation.
@@ -122,8 +122,12 @@ public class LocalSlicingWindowAggOperator extends 
AbstractStreamOperator<RowDat
                 // we only need to call advanceProgress() when current 
watermark may trigger window
                 windowBuffer.advanceProgress(currentWatermark);
                 nextTriggerWatermark =
-                        getNextTriggerWatermark(
-                                currentWatermark, windowInterval, 
shiftTimezone, useDayLightSaving);
+                        getNextTriggerWatermarkWithOffset(
+                                currentWatermark,
+                                windowInterval,
+                                sliceAssigner.getWindowOffset(),
+                                shiftTimezone,
+                                useDayLightSaving);
             }
         }
         super.processWatermark(mark);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
index 13797f9f717..acc986a930f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindow
 
 import java.time.ZoneId;
 
-import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermark;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset;
 import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
 
 /** A base implementation of {@link SlicingSyncStateWindowProcessor} for 
window aggregate. */
@@ -146,8 +146,12 @@ public abstract class 
AbstractSliceSyncStateWindowAggProcessor
                 // they will register small timers and normal watermark will 
flush the buffer
                 windowBuffer.advanceProgress(currentProgress);
                 nextTriggerProgress =
-                        getNextTriggerWatermark(
-                                currentProgress, windowInterval, 
shiftTimeZone, useDayLightSaving);
+                        getNextTriggerWatermarkWithOffset(
+                                currentProgress,
+                                windowInterval,
+                                sliceAssigner.getWindowOffset(),
+                                shiftTimeZone,
+                                useDayLightSaving);
             }
         }
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
index 834af35d30d..0b5bc15eefc 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
@@ -65,4 +65,7 @@ public interface SliceAssigner extends WindowAssigner {
      * slice assigned.
      */
     long getSliceEndInterval();
+
+    /** Returns the window offset if supported or zero otherwise. */
+    long getWindowOffset();
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
index 4294757154e..75b7ce6fac0 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
@@ -193,6 +193,11 @@ public final class SliceAssigners {
             return size;
         }
 
+        @Override
+        public long getWindowOffset() {
+            return offset;
+        }
+
         @Override
         public String getDescription() {
             return String.format("TumblingWindow(size=%dms, offset=%dms)", 
size, offset);
@@ -269,6 +274,11 @@ public final class SliceAssigners {
             return sliceSize;
         }
 
+        @Override
+        public long getWindowOffset() {
+            return offset;
+        }
+
         @Override
         public void mergeSlices(long sliceEnd, MergeCallback<Long, 
Iterable<Long>> callback)
                 throws Exception {
@@ -394,6 +404,11 @@ public final class SliceAssigners {
             return step;
         }
 
+        @Override
+        public long getWindowOffset() {
+            return offset;
+        }
+
         @Override
         public void mergeSlices(long sliceEnd, MergeCallback<Long, 
Iterable<Long>> callback)
                 throws Exception {
@@ -500,6 +515,11 @@ public final class SliceAssigners {
             return innerAssigner.getSliceEndInterval();
         }
 
+        @Override
+        public long getWindowOffset() {
+            return innerAssigner.getWindowOffset();
+        }
+
         @Override
         public boolean isEventTime() {
             // it always works in event-time mode if input row has been 
attached windows
@@ -627,6 +647,11 @@ public final class SliceAssigners {
             return innerAssigner.getSliceEndInterval();
         }
 
+        @Override
+        public long getWindowOffset() {
+            return innerAssigner.getWindowOffset();
+        }
+
         @Override
         public boolean isEventTime() {
             // it always works in event-time mode if input row has been 
attached slices
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
index 75a823b3376..354363c3f0e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
@@ -183,8 +183,12 @@ public class TimeWindowUtil {
     }
 
     /** Method to get the next watermark to trigger window. */
-    public static long getNextTriggerWatermark(
-            long currentWatermark, long interval, ZoneId shiftTimezone, 
boolean useDayLightSaving) {
+    public static long getNextTriggerWatermarkWithOffset(
+            long currentWatermark,
+            long interval,
+            long offset,
+            ZoneId shiftTimezone,
+            boolean useDayLightSaving) {
         if (currentWatermark == Long.MAX_VALUE) {
             return currentWatermark;
         }
@@ -194,10 +198,10 @@ public class TimeWindowUtil {
         if (useDayLightSaving) {
             long utcWindowStart =
                     getWindowStartWithOffset(
-                            toUtcTimestampMills(currentWatermark, 
shiftTimezone), 0L, interval);
+                            toUtcTimestampMills(currentWatermark, 
shiftTimezone), offset, interval);
             triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval 
- 1, shiftTimezone);
         } else {
-            long start = getWindowStartWithOffset(currentWatermark, 0L, 
interval);
+            long start = getWindowStartWithOffset(currentWatermark, offset, 
interval);
             triggerWatermark = start + interval - 1;
         }
 

Reply via email to