This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 87b36233512 [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in 
WindowOperator due to Improper Timer Cleanup (#25032)
87b36233512 is described below

commit 87b362335128e62112cf34839837b2d635d93df2
Author: Kartikey Pant <kartikeypant....@gmail.com>
AuthorDate: Mon Jul 8 07:41:47 2024 +0530

    [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to 
Improper Timer Cleanup (#25032)
    
    * [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper 
Timer Cleanup
    * Fix one of the added WindowOperatorTest methods to be public
---
 .../operators/windowing/WindowOperator.java        |  10 +-
 .../operators/windowing/WindowOperatorTest.java    | 151 +++++++++++++++++++++
 2 files changed, 155 insertions(+), 6 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index a45b5b68e72..a660a4c2ea6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -372,10 +372,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                 if (triggerResult.isFire()) {
                     ACC contents = windowState.get();
-                    if (contents == null) {
-                        continue;
+                    if (contents != null) {
+                        emitWindowContents(actualWindow, contents);
                     }
-                    emitWindowContents(actualWindow, contents);
                 }
 
                 if (triggerResult.isPurge()) {
@@ -405,10 +404,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                 if (triggerResult.isFire()) {
                     ACC contents = windowState.get();
-                    if (contents == null) {
-                        continue;
+                    if (contents != null) {
+                        emitWindowContents(window, contents);
                     }
-                    emitWindowContents(window, contents);
                 }
 
                 if (triggerResult.isPurge()) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index bc5ef7f0b05..dda2728ab44 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -84,6 +84,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -3069,6 +3070,114 @@ public class WindowOperatorTest extends TestLogger {
         testHarness.close();
     }
 
+    @Test
+    public void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() 
throws Exception {
+        final int windowSize = 2;
+        final long lateness = 1;
+
+        ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+                new ListStateDescriptor<>(
+                        "window-contents",
+                        STRING_INT_TUPLE.createSerializer(new 
ExecutionConfig()));
+
+        WindowOperator<
+                        String,
+                        Tuple2<String, Integer>,
+                        Iterable<Tuple2<String, Integer>>,
+                        Tuple2<String, Integer>,
+                        TimeWindow>
+                operator =
+                        new WindowOperator<>(
+                                
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
+                                new TimeWindow.Serializer(),
+                                new TupleKeySelector(),
+                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+                                        new ExecutionConfig()),
+                                windowStateDesc,
+                                new InternalIterableWindowFunction<>(new 
EmptyReturnFunction()),
+                                new 
FireEverytimeOnElementAndEventTimeTrigger(),
+                                lateness,
+                                null /* late data output tag */);
+
+        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+
+        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+        // normal element
+        testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key", 
1), 1000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1599));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1699));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1799));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1999));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(2000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[]");
+        testHarness.processWatermark(new Watermark(5000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[]");
+
+        expected.add(new Watermark(1599));
+        expected.add(new Watermark(1699));
+        expected.add(new Watermark(1799));
+        expected.add(new Watermark(1999)); // here it fires and purges
+        expected.add(new Watermark(2000)); // here is the cleanup timer
+        expected.add(new Watermark(5000));
+
+        TestHarnessUtil.assertOutputEqualsSorted(
+                "Output was not correct.",
+                expected,
+                testHarness.getOutput(),
+                new Tuple2ResultSortComparator());
+        testHarness.close();
+    }
+
     // ------------------------------------------------------------------------
     //  UDFs
     // ------------------------------------------------------------------------
@@ -3091,6 +3200,20 @@ public class WindowOperatorTest extends TestLogger {
         }
     }
 
+    private static class EmptyReturnFunction
+            implements WindowFunction<
+                    Tuple2<String, Integer>, Tuple2<String, Integer>, String, 
TimeWindow> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void apply(
+                String k,
+                TimeWindow window,
+                Iterable<Tuple2<String, Integer>> input,
+                Collector<Tuple2<String, Integer>> out)
+                throws Exception {}
+    }
+
     private static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
         private static final long serialVersionUID = 1L;
 
@@ -3361,4 +3484,32 @@ public class WindowOperatorTest extends TestLogger {
             return "EventTimeTrigger()";
         }
     }
+
+    private static class FireEverytimeOnElementAndEventTimeTrigger
+            extends Trigger<Tuple2<String, Integer>, TimeWindow> {
+        @Override
+        public TriggerResult onElement(
+                Tuple2<String, Integer> element,
+                long timestamp,
+                TimeWindow window,
+                TriggerContext ctx)
+                throws Exception {
+            return TriggerResult.FIRE;
+        }
+
+        @Override
+        public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx)
+                throws Exception {
+            return TriggerResult.FIRE;
+        }
+
+        @Override
+        public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx)
+                throws Exception {
+            return TriggerResult.FIRE;
+        }
+
+        @Override
+        public void clear(TimeWindow window, TriggerContext ctx) throws 
Exception {}
+    }
 }

Reply via email to