This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 55b6978b6a0 [FLINK-37512] Fix the OOM for SlidingEventTimeWindows
(#26321)
55b6978b6a0 is described below
commit 55b6978b6a00c6a6cc1348eb70084dde3cf0d82d
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Jun 5 15:52:21 2025 +0800
[FLINK-37512] Fix the OOM for SlidingEventTimeWindows (#26321)
---
.../api/windowing/assigners/SlidingEventTimeWindows.java | 8 ++++++++
.../operators/windowing/SlidingEventTimeWindowsTest.java | 11 +++++++++++
2 files changed, 19 insertions(+)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 0191ad928eb..666a65a108d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -47,6 +47,8 @@ import java.util.List;
public class SlidingEventTimeWindows extends WindowAssigner<Object,
TimeWindow> {
private static final long serialVersionUID = 1L;
+ public static final int MAX_WINDOW_NUM = 10000000;
+
private final long size;
private final long slide;
@@ -59,6 +61,12 @@ public class SlidingEventTimeWindows extends
WindowAssigner<Object, TimeWindow>
"SlidingEventTimeWindows parameters must satisfy "
+ "abs(offset) < slide and size > 0");
}
+ if (size / slide > MAX_WINDOW_NUM) {
+ throw new IllegalArgumentException(
+ String.format(
+ "SlidingEventTimeWindows parameters must satisfy
size / slide <= %d",
+ MAX_WINDOW_NUM));
+ }
this.size = size;
this.slide = slide;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index f7b1332200b..e5c00277d2d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -215,6 +215,17 @@ class SlidingEventTimeWindowsTest {
Duration.ofSeconds(11)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("abs(offset) < slide and size > 0");
+
+ assertThatThrownBy(
+ () ->
+ SlidingEventTimeWindows.of(
+ Duration.ofMillis(
+
SlidingEventTimeWindows.MAX_WINDOW_NUM + 1),
+ Duration.ofMillis(1)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ String.format(
+ "size / slide <= %d",
SlidingEventTimeWindows.MAX_WINDOW_NUM));
}
@Test