This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 6c4dae8 [FLINK-5717][datastream] Fix NPE and lost timer during window merging for ContinuousProcessingTimeTrigger 6c4dae8 is described below commit 6c4dae80211a65d1b1219bd53e38855ed4f0e7e5 Author: Kezhu Wang <kez...@gmail.com> AuthorDate: Mon Mar 15 14:59:46 2021 +0800 [FLINK-5717][datastream] Fix NPE and lost timer during window merging for ContinuousProcessingTimeTrigger This fixes: * NPE in clear() due to state merged out. * Timer lost due to no timer registration for new window. This closes #15241 --- .../triggers/ContinuousProcessingTimeTrigger.java | 18 ++- .../ContinuousProcessingTimeTriggerTest.java | 162 +++++++++++++++++++++ 2 files changed, 176 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index b4cdb0d..c437ba7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -87,10 +87,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O @Override public void clear(W window, TriggerContext ctx) throws Exception { + // State could be merged into new window. ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); - long timestamp = fireTimestamp.get(); - ctx.deleteProcessingTimeTimer(timestamp); - fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteProcessingTimeTimer(timestamp); + fireTimestamp.clear(); + } } @Override @@ -99,8 +102,15 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O } @Override - public void onMerge(W window, OnMergeContext ctx) { + public void onMerge(W window, OnMergeContext ctx) throws Exception { + // States for old windows will lose after the call. ctx.mergePartitionedState(stateDesc); + + // Register timer for this new window. + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerProcessingTimeTimer(nextFireTimestamp); + } } @VisibleForTesting diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java new file mode 100644 index 0000000..a50ee92 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.NullByteKeySelector; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.Objects; +import java.util.stream.StreamSupport; + +import static org.junit.Assert.assertTrue; + +/** Tests for {@link ContinuousProcessingTimeTrigger}. */ +public class ContinuousProcessingTimeTriggerTest { + + private static final long NO_TIMESTAMP = Watermark.UNINITIALIZED.getTimestamp(); + + private static class WindowedInteger { + private final TimeWindow window; + private final int value; + + public WindowedInteger(TimeWindow window, int value) { + this.window = window; + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof WindowedInteger)) { + return false; + } + WindowedInteger other = (WindowedInteger) o; + return value == other.value && Objects.equals(window, other.window); + } + + @Override + public int hashCode() { + return Objects.hash(window, value); + } + + @Override + public String toString() { + return "WindowedInteger{" + "window=" + window + ", value=" + value + '}'; + } + } + + private static class IntegerSumWindowFunction + implements WindowFunction<Integer, WindowedInteger, Byte, TimeWindow> { + @Override + public void apply( + Byte key, + TimeWindow window, + Iterable<Integer> input, + Collector<WindowedInteger> out) + throws Exception { + int sum = + StreamSupport.stream(input.spliterator(), false) + .mapToInt(Integer::intValue) + .sum(); + out.collect(new WindowedInteger(window, sum)); + } + } + + @Test + public void testMergingWindows() throws Exception { + ContinuousProcessingTimeTrigger<TimeWindow> trigger = + ContinuousProcessingTimeTrigger.of(Time.milliseconds(5)); + + assertTrue(trigger.canMerge()); + + ListStateDescriptor<Integer> stateDesc = + new ListStateDescriptor<>( + "window-contents", + BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())); + + WindowOperator<Byte, Integer, Iterable<Integer>, WindowedInteger, TimeWindow> operator = + new WindowOperator<>( + ProcessingTimeSessionWindows.withGap(Time.milliseconds(10)), + new TimeWindow.Serializer(), + new NullByteKeySelector<>(), + BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new IntegerSumWindowFunction()), + trigger, + 0, + null); + + KeyedOneInputStreamOperatorTestHarness<Byte, Integer, WindowedInteger> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + operator, operator.getKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO); + + ArrayDeque<Object> expectedOutput = new ArrayDeque<>(); + + testHarness.open(); + + // window [0, 10) + testHarness.getProcessingTimeService().setCurrentTime(0); + testHarness.processElement(1, NO_TIMESTAMP); + + // window [2, 12) ==> [0, 12) + testHarness.getProcessingTimeService().setCurrentTime(2); + testHarness.processElement(2, NO_TIMESTAMP); + + // Merged timer should still fire. + testHarness.getProcessingTimeService().setCurrentTime(5); + expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 12), 3), 11)); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + + // Merged window should work as normal. + testHarness.getProcessingTimeService().setCurrentTime(9); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + + testHarness.getProcessingTimeService().setCurrentTime(10); + expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 12), 3), 11)); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + + // There is no on time firing for now. + testHarness.getProcessingTimeService().setCurrentTime(15); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + + // Window is dropped already. + testHarness.getProcessingTimeService().setCurrentTime(100); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + } +}