This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 55b6978b6a0 [FLINK-37512] Fix the OOM for SlidingEventTimeWindows 
(#26321)
55b6978b6a0 is described below

commit 55b6978b6a00c6a6cc1348eb70084dde3cf0d82d
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Jun 5 15:52:21 2025 +0800

    [FLINK-37512] Fix the OOM for SlidingEventTimeWindows (#26321)
---
 .../api/windowing/assigners/SlidingEventTimeWindows.java      |  8 ++++++++
 .../operators/windowing/SlidingEventTimeWindowsTest.java      | 11 +++++++++++
 2 files changed, 19 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 0191ad928eb..666a65a108d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -47,6 +47,8 @@ import java.util.List;
 public class SlidingEventTimeWindows extends WindowAssigner<Object, 
TimeWindow> {
     private static final long serialVersionUID = 1L;
 
+    public static final int MAX_WINDOW_NUM = 10000000;
+
     private final long size;
 
     private final long slide;
@@ -59,6 +61,12 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
                     "SlidingEventTimeWindows parameters must satisfy "
                             + "abs(offset) < slide and size > 0");
         }
+        if (size / slide > MAX_WINDOW_NUM) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "SlidingEventTimeWindows parameters must satisfy 
size / slide <= %d",
+                            MAX_WINDOW_NUM));
+        }
 
         this.size = size;
         this.slide = slide;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index f7b1332200b..e5c00277d2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -215,6 +215,17 @@ class SlidingEventTimeWindowsTest {
                                         Duration.ofSeconds(11)))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("abs(offset) < slide and size > 0");
+
+        assertThatThrownBy(
+                        () ->
+                                SlidingEventTimeWindows.of(
+                                        Duration.ofMillis(
+                                                
SlidingEventTimeWindows.MAX_WINDOW_NUM + 1),
+                                        Duration.ofMillis(1)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        String.format(
+                                "size / slide <= %d", 
SlidingEventTimeWindows.MAX_WINDOW_NUM));
     }
 
     @Test

Reply via email to