This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 6c4dae8  [FLINK-5717][datastream] Fix NPE and lost timer during window 
merging for ContinuousProcessingTimeTrigger
6c4dae8 is described below

commit 6c4dae80211a65d1b1219bd53e38855ed4f0e7e5
Author: Kezhu Wang <kez...@gmail.com>
AuthorDate: Mon Mar 15 14:59:46 2021 +0800

    [FLINK-5717][datastream] Fix NPE and lost timer during window merging for 
ContinuousProcessingTimeTrigger
    
    This fixes:
    * NPE in clear() due to state merged out.
    * Timer lost due to no timer registration for new window.
    
    This closes #15241
---
 .../triggers/ContinuousProcessingTimeTrigger.java  |  18 ++-
 .../ContinuousProcessingTimeTriggerTest.java       | 162 +++++++++++++++++++++
 2 files changed, 176 insertions(+), 4 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index b4cdb0d..c437ba7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -87,10 +87,13 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> extends Trigger<O
 
     @Override
     public void clear(W window, TriggerContext ctx) throws Exception {
+        // State could be merged into new window.
         ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
-        long timestamp = fireTimestamp.get();
-        ctx.deleteProcessingTimeTimer(timestamp);
-        fireTimestamp.clear();
+        Long timestamp = fireTimestamp.get();
+        if (timestamp != null) {
+            ctx.deleteProcessingTimeTimer(timestamp);
+            fireTimestamp.clear();
+        }
     }
 
     @Override
@@ -99,8 +102,15 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> extends Trigger<O
     }
 
     @Override
-    public void onMerge(W window, OnMergeContext ctx) {
+    public void onMerge(W window, OnMergeContext ctx) throws Exception {
+        // States for old windows will lose after the call.
         ctx.mergePartitionedState(stateDesc);
+
+        // Register timer for this new window.
+        Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
+        if (nextFireTimestamp != null) {
+            ctx.registerProcessingTimeTimer(nextFireTimestamp);
+        }
     }
 
     @VisibleForTesting
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
new file mode 100644
index 0000000..a50ee92
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Objects;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ContinuousProcessingTimeTrigger}. */
+public class ContinuousProcessingTimeTriggerTest {
+
+    private static final long NO_TIMESTAMP = 
Watermark.UNINITIALIZED.getTimestamp();
+
+    private static class WindowedInteger {
+        private final TimeWindow window;
+        private final int value;
+
+        public WindowedInteger(TimeWindow window, int value) {
+            this.window = window;
+            this.value = value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof WindowedInteger)) {
+                return false;
+            }
+            WindowedInteger other = (WindowedInteger) o;
+            return value == other.value && Objects.equals(window, 
other.window);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(window, value);
+        }
+
+        @Override
+        public String toString() {
+            return "WindowedInteger{" + "window=" + window + ", value=" + 
value + '}';
+        }
+    }
+
+    private static class IntegerSumWindowFunction
+            implements WindowFunction<Integer, WindowedInteger, Byte, 
TimeWindow> {
+        @Override
+        public void apply(
+                Byte key,
+                TimeWindow window,
+                Iterable<Integer> input,
+                Collector<WindowedInteger> out)
+                throws Exception {
+            int sum =
+                    StreamSupport.stream(input.spliterator(), false)
+                            .mapToInt(Integer::intValue)
+                            .sum();
+            out.collect(new WindowedInteger(window, sum));
+        }
+    }
+
+    @Test
+    public void testMergingWindows() throws Exception {
+        ContinuousProcessingTimeTrigger<TimeWindow> trigger =
+                ContinuousProcessingTimeTrigger.of(Time.milliseconds(5));
+
+        assertTrue(trigger.canMerge());
+
+        ListStateDescriptor<Integer> stateDesc =
+                new ListStateDescriptor<>(
+                        "window-contents",
+                        BasicTypeInfo.INT_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+
+        WindowOperator<Byte, Integer, Iterable<Integer>, WindowedInteger, 
TimeWindow> operator =
+                new WindowOperator<>(
+                        
ProcessingTimeSessionWindows.withGap(Time.milliseconds(10)),
+                        new TimeWindow.Serializer(),
+                        new NullByteKeySelector<>(),
+                        BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                        stateDesc,
+                        new InternalIterableWindowFunction<>(new 
IntegerSumWindowFunction()),
+                        trigger,
+                        0,
+                        null);
+
+        KeyedOneInputStreamOperatorTestHarness<Byte, Integer, WindowedInteger> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        operator, operator.getKeySelector(), 
BasicTypeInfo.BYTE_TYPE_INFO);
+
+        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
+
+        testHarness.open();
+
+        // window [0, 10)
+        testHarness.getProcessingTimeService().setCurrentTime(0);
+        testHarness.processElement(1, NO_TIMESTAMP);
+
+        // window [2, 12) ==> [0, 12)
+        testHarness.getProcessingTimeService().setCurrentTime(2);
+        testHarness.processElement(2, NO_TIMESTAMP);
+
+        // Merged timer should still fire.
+        testHarness.getProcessingTimeService().setCurrentTime(5);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new 
TimeWindow(0, 12), 3), 11));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // Merged window should work as normal.
+        testHarness.getProcessingTimeService().setCurrentTime(9);
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        testHarness.getProcessingTimeService().setCurrentTime(10);
+        expectedOutput.add(new StreamRecord<>(new WindowedInteger(new 
TimeWindow(0, 12), 3), 11));
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // There is no on time firing for now.
+        testHarness.getProcessingTimeService().setCurrentTime(15);
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+
+        // Window is dropped already.
+        testHarness.getProcessingTimeService().setCurrentTime(100);
+        TestHarnessUtil.assertOutputEquals(
+                "Output mismatch", expectedOutput, testHarness.getOutput());
+    }
+}

Reply via email to