This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new 87b36233512 [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#25032) 87b36233512 is described below commit 87b362335128e62112cf34839837b2d635d93df2 Author: Kartikey Pant <kartikeypant....@gmail.com> AuthorDate: Mon Jul 8 07:41:47 2024 +0530 [BP-1.18] [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup (#25032) * [FLINK-33192][runtime] Fix Memory Leak in WindowOperator due to Improper Timer Cleanup * Fix one of the added WindowOperatorTest methods to be public --- .../operators/windowing/WindowOperator.java | 10 +- .../operators/windowing/WindowOperatorTest.java | 151 +++++++++++++++++++++ 2 files changed, 155 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index a45b5b68e72..a660a4c2ea6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -372,10 +372,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (triggerResult.isFire()) { ACC contents = windowState.get(); - if (contents == null) { - continue; + if (contents != null) { + emitWindowContents(actualWindow, contents); } - emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { @@ -405,10 +404,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (triggerResult.isFire()) { ACC contents = windowState.get(); - if (contents == null) { - continue; + if (contents != null) { + emitWindowContents(window, contents); } - emitWindowContents(window, contents); } if (triggerResult.isPurge()) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index bc5ef7f0b05..dda2728ab44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -84,6 +84,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -3069,6 +3070,114 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } + @Test + public void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws Exception { + final int windowSize = 2; + final long lateness = 1; + + ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc = + new ListStateDescriptor<>( + "window-contents", + STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); + + WindowOperator< + String, + Tuple2<String, Integer>, + Iterable<Tuple2<String, Integer>>, + Tuple2<String, Integer>, + TimeWindow> + operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer( + new ExecutionConfig()), + windowStateDesc, + new InternalIterableWindowFunction<>(new EmptyReturnFunction()), + new FireEverytimeOnElementAndEventTimeTrigger(), + lateness, + null /* late data output tag */); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> + testHarness = createTestHarness(operator); + + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key", 1), 1000)); + assertThat( + operator.processContext + .windowState() + .getListState(windowStateDesc) + .get() + .toString()) + .isEqualTo("[(test_key,1)]"); + testHarness.processWatermark(new Watermark(1599)); + assertThat( + operator.processContext + .windowState() + .getListState(windowStateDesc) + .get() + .toString()) + .isEqualTo("[(test_key,1)]"); + testHarness.processWatermark(new Watermark(1699)); + assertThat( + operator.processContext + .windowState() + .getListState(windowStateDesc) + .get() + .toString()) + .isEqualTo("[(test_key,1)]"); + testHarness.processWatermark(new Watermark(1799)); + assertThat( + operator.processContext + .windowState() + .getListState(windowStateDesc) + .get() + .toString()) + .isEqualTo("[(test_key,1)]"); + testHarness.processWatermark(new Watermark(1999)); + assertThat( + operator.processContext + .windowState() + .getListState(windowStateDesc) + .get() + .toString()) + .isEqualTo("[(test_key,1)]"); + testHarness.processWatermark(new Watermark(2000)); + assertThat( + operator.processContext + .windowState() + .getListState(windowStateDesc) + .get() + .toString()) + .isEqualTo("[]"); + testHarness.processWatermark(new Watermark(5000)); + assertThat( + operator.processContext + .windowState() + .getListState(windowStateDesc) + .get() + .toString()) + .isEqualTo("[]"); + + expected.add(new Watermark(1599)); + expected.add(new Watermark(1699)); + expected.add(new Watermark(1799)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2000)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted( + "Output was not correct.", + expected, + testHarness.getOutput(), + new Tuple2ResultSortComparator()); + testHarness.close(); + } + // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ @@ -3091,6 +3200,20 @@ public class WindowOperatorTest extends TestLogger { } } + private static class EmptyReturnFunction + implements WindowFunction< + Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void apply( + String k, + TimeWindow window, + Iterable<Tuple2<String, Integer>> input, + Collector<Tuple2<String, Integer>> out) + throws Exception {} + } + private static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @@ -3361,4 +3484,32 @@ public class WindowOperatorTest extends TestLogger { return "EventTimeTrigger()"; } } + + private static class FireEverytimeOnElementAndEventTimeTrigger + extends Trigger<Tuple2<String, Integer>, TimeWindow> { + @Override + public TriggerResult onElement( + Tuple2<String, Integer> element, + long timestamp, + TimeWindow window, + TriggerContext ctx) + throws Exception { + return TriggerResult.FIRE; + } + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) + throws Exception { + return TriggerResult.FIRE; + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) + throws Exception { + return TriggerResult.FIRE; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} + } }