This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b7a186b4553 [FLINK-38998][table] Fix cascaded event time
WindowAggregate does not produce data when offset is specified
b7a186b4553 is described below
commit b7a186b4553599477a1c7dc60701d59110fafb3b
Author: Sergey Kononov <[email protected]>
AuthorDate: Mon Mar 16 05:46:38 2026 +0700
[FLINK-38998][table] Fix cascaded event time WindowAggregate does not
produce data when offset is specified
---
.../runtime/stream/sql/WindowAggregateITCase.scala | 9 ++++++--
.../AbstractAsyncStateSliceWindowAggProcessor.java | 10 ++++++---
.../window/LocalSlicingWindowAggOperator.java | 10 ++++++---
.../AbstractSliceSyncStateWindowAggProcessor.java | 10 ++++++---
.../window/tvf/slicing/SliceAssigner.java | 3 +++
.../window/tvf/slicing/SliceAssigners.java | 25 ++++++++++++++++++++++
.../flink/table/runtime/util/TimeWindowUtil.java | 12 +++++++----
7 files changed, 64 insertions(+), 15 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 8485d1f418e..12ac1f73680 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -258,7 +258,7 @@ class WindowAggregateITCase(
| window_end,
| COUNT(DISTINCT `string`) AS cnt
| FROM TABLE(
- | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '1' DAY,
INTERVAL '8' HOUR))
+ | TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '30' SECONDS,
INTERVAL '15' SECONDS))
| GROUP BY `name`, window_start, window_end
|) GROUP BY cnt, window_start, window_end
""".stripMargin
@@ -268,7 +268,12 @@ class WindowAggregateITCase(
env.execute()
val expected =
- Seq("0,2020-10-09T08:00,2020-10-10T08:00,1",
"3,2020-10-09T08:00,2020-10-10T08:00,2")
+ Seq(
+ "2,2020-10-09T23:59:45,2020-10-10T00:00:15,1",
+ "3,2020-10-09T23:59:45,2020-10-10T00:00:15,1",
+ "2,2020-10-10T00:00:15,2020-10-10T00:00:45,1",
+ "0,2020-10-10T00:00:15,2020-10-10T00:00:45,1"
+ )
assertThat(sink.getAppendResults.sorted.mkString("\n"))
.isEqualTo(expected.sorted.mkString("\n"))
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
index cca05999ae3..9934ae46078 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
@@ -41,7 +41,7 @@ import java.util.List;
import static
org.apache.flink.table.runtime.util.AsyncStateUtils.REUSABLE_TRUE_STATE_FUTURE;
import static
org.apache.flink.table.runtime.util.AsyncStateUtils.REUSABLE_VOID_STATE_FUTURE;
-import static
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermark;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
/** A base implementation of {@link AsyncStateSlicingWindowProcessor} for
window aggregate. */
@@ -168,8 +168,12 @@ public abstract class
AbstractAsyncStateSliceWindowAggProcessor
// they will register small timers and normal watermark will
flush the buffer
advanceFuture = windowBuffer.advanceProgress(currentKey,
currentProgress);
nextTriggerProgress =
- getNextTriggerWatermark(
- currentProgress, windowInterval,
shiftTimeZone, useDayLightSaving);
+ getNextTriggerWatermarkWithOffset(
+ currentProgress,
+ windowInterval,
+ sliceAssigner.getWindowOffset(),
+ shiftTimeZone,
+ useDayLightSaving);
}
}
return advanceFuture;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
index 5086ad1451a..ed96794c7fd 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java
@@ -34,7 +34,7 @@ import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner
import java.time.ZoneId;
import java.util.TimeZone;
-import static
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermark;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset;
/**
* The operator used for local window aggregation.
@@ -122,8 +122,12 @@ public class LocalSlicingWindowAggOperator extends
AbstractStreamOperator<RowDat
// we only need to call advanceProgress() when current
watermark may trigger window
windowBuffer.advanceProgress(currentWatermark);
nextTriggerWatermark =
- getNextTriggerWatermark(
- currentWatermark, windowInterval,
shiftTimezone, useDayLightSaving);
+ getNextTriggerWatermarkWithOffset(
+ currentWatermark,
+ windowInterval,
+ sliceAssigner.getWindowOffset(),
+ shiftTimezone,
+ useDayLightSaving);
}
}
super.processWatermark(mark);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
index 13797f9f717..acc986a930f 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractSliceSyncStateWindowAggProcessor.java
@@ -32,7 +32,7 @@ import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindow
import java.time.ZoneId;
-import static
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermark;
+import static
org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
/** A base implementation of {@link SlicingSyncStateWindowProcessor} for
window aggregate. */
@@ -146,8 +146,12 @@ public abstract class
AbstractSliceSyncStateWindowAggProcessor
// they will register small timers and normal watermark will
flush the buffer
windowBuffer.advanceProgress(currentProgress);
nextTriggerProgress =
- getNextTriggerWatermark(
- currentProgress, windowInterval,
shiftTimeZone, useDayLightSaving);
+ getNextTriggerWatermarkWithOffset(
+ currentProgress,
+ windowInterval,
+ sliceAssigner.getWindowOffset(),
+ shiftTimeZone,
+ useDayLightSaving);
}
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
index 834af35d30d..0b5bc15eefc 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java
@@ -65,4 +65,7 @@ public interface SliceAssigner extends WindowAssigner {
* slice assigned.
*/
long getSliceEndInterval();
+
+ /** Returns the window offset if supported or zero otherwise. */
+ long getWindowOffset();
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
index 4294757154e..75b7ce6fac0 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigners.java
@@ -193,6 +193,11 @@ public final class SliceAssigners {
return size;
}
+ @Override
+ public long getWindowOffset() {
+ return offset;
+ }
+
@Override
public String getDescription() {
return String.format("TumblingWindow(size=%dms, offset=%dms)",
size, offset);
@@ -269,6 +274,11 @@ public final class SliceAssigners {
return sliceSize;
}
+ @Override
+ public long getWindowOffset() {
+ return offset;
+ }
+
@Override
public void mergeSlices(long sliceEnd, MergeCallback<Long,
Iterable<Long>> callback)
throws Exception {
@@ -394,6 +404,11 @@ public final class SliceAssigners {
return step;
}
+ @Override
+ public long getWindowOffset() {
+ return offset;
+ }
+
@Override
public void mergeSlices(long sliceEnd, MergeCallback<Long,
Iterable<Long>> callback)
throws Exception {
@@ -500,6 +515,11 @@ public final class SliceAssigners {
return innerAssigner.getSliceEndInterval();
}
+ @Override
+ public long getWindowOffset() {
+ return innerAssigner.getWindowOffset();
+ }
+
@Override
public boolean isEventTime() {
// it always works in event-time mode if input row has been
attached windows
@@ -627,6 +647,11 @@ public final class SliceAssigners {
return innerAssigner.getSliceEndInterval();
}
+ @Override
+ public long getWindowOffset() {
+ return innerAssigner.getWindowOffset();
+ }
+
@Override
public boolean isEventTime() {
// it always works in event-time mode if input row has been
attached slices
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
index 75a823b3376..354363c3f0e 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
@@ -183,8 +183,12 @@ public class TimeWindowUtil {
}
/** Method to get the next watermark to trigger window. */
- public static long getNextTriggerWatermark(
- long currentWatermark, long interval, ZoneId shiftTimezone,
boolean useDayLightSaving) {
+ public static long getNextTriggerWatermarkWithOffset(
+ long currentWatermark,
+ long interval,
+ long offset,
+ ZoneId shiftTimezone,
+ boolean useDayLightSaving) {
if (currentWatermark == Long.MAX_VALUE) {
return currentWatermark;
}
@@ -194,10 +198,10 @@ public class TimeWindowUtil {
if (useDayLightSaving) {
long utcWindowStart =
getWindowStartWithOffset(
- toUtcTimestampMills(currentWatermark,
shiftTimezone), 0L, interval);
+ toUtcTimestampMills(currentWatermark,
shiftTimezone), offset, interval);
triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval
- 1, shiftTimezone);
} else {
- long start = getWindowStartWithOffset(currentWatermark, 0L,
interval);
+ long start = getWindowStartWithOffset(currentWatermark, offset,
interval);
triggerWatermark = start + interval - 1;
}