[FLINK-5969] Add WindowOperatorFrom12MigrationTest The binary snapshots for this were created on the Flink 1.2 branch.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c6377f2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c6377f2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c6377f2 Branch: refs/heads/master Commit: 2c6377f257cafcbaea3a5bf33dd27852ed05afec Parents: 84eea72 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Apr 24 17:13:27 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed May 3 16:25:57 2017 +0200 ---------------------------------------------------------------------- .../WindowOperatorFrom11MigrationTest.java | 896 ++++++++++++++++ .../WindowOperatorFrom12MigrationTest.java | 1014 ++++++++++++++++++ .../windowing/WindowOperatorMigrationTest.java | 896 ---------------- ...gration-test-accum-aligned-flink1.2-snapshot | Bin 0 -> 222 bytes ...igration-test-aggr-aligned-flink1.2-snapshot | Bin 0 -> 187 bytes ...tion-test-apply-event-time-flink1.2-snapshot | Bin 0 -> 2193 bytes ...test-apply-processing-time-flink1.2-snapshot | Bin 0 -> 2081 bytes ...ion-test-reduce-event-time-flink1.2-snapshot | Bin 0 -> 1988 bytes ...est-reduce-processing-time-flink1.2-snapshot | Bin 0 -> 1925 bytes ...sion-with-stateful-trigger-flink1.2-snapshot | Bin 0 -> 3703 bytes ...with-stateful-trigger-mint-flink1.2-snapshot | Bin 0 -> 494 bytes 11 files changed, 1910 insertions(+), 896 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java new file mode 100644 index 0000000..9ec1923 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom11MigrationTest.java @@ -0,0 +1,896 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType; +import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.net.URL; +import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.fail; + +/** + * Tests for checking whether {@link WindowOperator} can restore from snapshots that were done + * using the Flink 1.1 {@link WindowOperator}. + * + * <p> + * This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.1 + * aligned processing-time windows operator. + * + * <p>For regenerating the binary snapshot file you have to run the commented out portion + * of each test on a checkout of the Flink 1.1 branch. + */ +public class WindowOperatorFrom11MigrationTest { + + private static String getResourceFilename(String filename) { + ClassLoader cl = WindowOperatorFrom11MigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRestoreSessionWindowsWithCountTriggerFromFlink11() throws Exception { + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + PurgingTrigger.of(CountTrigger.of(4)), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /* + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); + + // do snapshot and save to file + StreamTaskState snapshot = testHarness.snapshot(0, 0); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot"); + testHarness.close(); + */ + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename( + "win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot")); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000)); + + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + // add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + + /** + * This checks that we can restore from a virgin {@code WindowOperator} that has never seen + * any elements. + */ + @Test + @SuppressWarnings("unchecked") + public void testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws Exception { + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + PurgingTrigger.of(CountTrigger.of(4)), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /* + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // do snapshot and save to file + StreamTaskState snapshot = testHarness.snapshot(0, 0); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot"); + testHarness.close(); + */ + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename( + "win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot")); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + // add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRestoreReducingEventTimeWindowsFromFlink11() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + EventTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /* + + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(new Watermark(999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(new Watermark(1999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + // do snapshot and save to file + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot"); + testHarness.close(); + + */ + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename( + "win-op-migration-test-reduce-event-time-flink1.1-snapshot")); + testHarness.open(); + + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new Watermark(2999)); + + testHarness.processWatermark(new Watermark(3999)); + expectedOutput.add(new Watermark(3999)); + + testHarness.processWatermark(new Watermark(4999)); + expectedOutput.add(new Watermark(4999)); + + testHarness.processWatermark(new Watermark(5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999)); + expectedOutput.add(new Watermark(5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRestoreApplyEventTimeWindowsFromFlink11() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), + EventTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /* + + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(new Watermark(999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(new Watermark(1999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + // do snapshot and save to file + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot"); + testHarness.close(); + + */ + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename( + "win-op-migration-test-apply-event-time-flink1.1-snapshot")); + testHarness.open(); + + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new Watermark(2999)); + + testHarness.processWatermark(new Watermark(3999)); + expectedOutput.add(new Watermark(3999)); + + testHarness.processWatermark(new Watermark(4999)); + expectedOutput.add(new Watermark(4999)); + + testHarness.processWatermark(new Watermark(5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999)); + expectedOutput.add(new Watermark(5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRestoreReducingProcessingTimeWindowsFromFlink11() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /* + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), timeServiceProvider); + + testHarness.configureForKeyedStream(new WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + timeServiceProvider.setCurrentTime(10); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + + timeServiceProvider.setCurrentTime(3010); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1))); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new WindowOperatorTest.Tuple2ResultSortComparator()); + + // do snapshot and save to file + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot"); + testHarness.close(); + */ + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename( + "win-op-migration-test-reduce-processing-time-flink1.1-snapshot")); + testHarness.open(); + + testHarness.setProcessingTime(3020); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3))); + + testHarness.setProcessingTime(6000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + @SuppressWarnings("unchecked") + public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /* + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), timeServiceProvider); + + testHarness.configureForKeyedStream(new WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + timeServiceProvider.setCurrentTime(10); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + + timeServiceProvider.setCurrentTime(3010); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1))); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new WindowOperatorTest.Tuple2ResultSortComparator()); + + // do snapshot and save to file + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot"); + testHarness.close(); + */ + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename( + "win-op-migration-test-apply-processing-time-flink1.1-snapshot")); + testHarness.open(); + + testHarness.setProcessingTime(3020); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3))); + + testHarness.setProcessingTime(6000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testRestoreAggregatingAlignedProcessingTimeWindowsFromFlink11() throws Exception { + /* + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + AggregatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>> operator = + new AggregatingProcessingTimeWindowOperator<>( + new ReduceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = -8913160567151867987L; + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<>(value1.f0, value1.f1 + value2.f1); + } + }, + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + inputType.createSerializer(new ExecutionConfig()), + 3000, + 3000); + + TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testTimeProvider.setCurrentTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + // do a snapshot, close and restore again + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot"); + testHarness.close(); + + */ + + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */, + LegacyWindowOperatorType.FAST_AGGREGATING); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot"); + testHarness.open(); + + testHarness.setProcessingTime(5000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + testHarness.setProcessingTime(7000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testRestoreAccumulatingAlignedProcessingTimeWindowsFromFlink11() throws Exception { + /* + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>> operator = + new AccumulatingProcessingTimeWindowOperator<>( + new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + + private static final long serialVersionUID = 6551516443265733803L; + + @Override + public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { + int sum = 0; + for (Tuple2<String, Integer> anInput : input) { + sum += anInput.f1; + } + out.collect(new Tuple2<>(s, sum)); + } + }, + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + inputType.createSerializer(new ExecutionConfig()), + 3000, + 3000); + + TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + testTimeProvider.setCurrentTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + // do a snapshot, close and restore again + StreamTaskState snapshot = testHarness.snapshot(0L, 0L); + testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot"); + testHarness.close(); + + */ + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */, + LegacyWindowOperatorType.FAST_ACCUMULATING); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.setup(); + testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot"); + testHarness.open(); + + testHarness.setProcessingTime(5000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + testHarness.setProcessingTime(7000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.close(); + } + + + private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + } + + @SuppressWarnings("unchecked") + private static class Tuple2ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark || o2 instanceof Watermark) { + return 0; + } else { + StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1; + StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2; + if (sr0.getTimestamp() != sr1.getTimestamp()) { + return (int) (sr0.getTimestamp() - sr1.getTimestamp()); + } + int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0); + if (comparison != 0) { + return comparison; + } else { + return sr0.getValue().f1 - sr1.getValue().f1; + } + } + } + } + + @SuppressWarnings("unchecked") + private static class Tuple3ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark || o2 instanceof Watermark) { + return 0; + } else { + StreamRecord<Tuple3<String, Long, Long>> sr0 = (StreamRecord<Tuple3<String, Long, Long>>) o1; + StreamRecord<Tuple3<String, Long, Long>> sr1 = (StreamRecord<Tuple3<String, Long, Long>>) o2; + if (sr0.getTimestamp() != sr1.getTimestamp()) { + return (int) (sr0.getTimestamp() - sr1.getTimestamp()); + } + int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0); + if (comparison != 0) { + return comparison; + } else { + comparison = (int) (sr0.getValue().f1 - sr1.getValue().f1); + if (comparison != 0) { + return comparison; + } + return (int) (sr0.getValue().f2 - sr1.getValue().f2); + } + } + } + } + + public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + } + } + + public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { + private static final long serialVersionUID = 1L; + + private boolean openCalled = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + } + + @Override + public void apply(String key, + W window, + Iterable<Tuple2<String, Integer>> input, + Collector<Tuple2<String, Integer>> out) throws Exception { + + if (!openCalled) { + fail("Open was not called"); + } + int sum = 0; + + for (Tuple2<String, Integer> t: input) { + sum += t.f1; + } + out.collect(new Tuple2<>(key, sum)); + + } + + } + + public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, Long, Long>> out) throws Exception { + int sum = 0; + for (Tuple2<String, Integer> i: values) { + sum += i.f1; + } + String resultString = key + "-" + sum; + out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd())); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java new file mode 100644 index 0000000..0d3a6dc --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorFrom12MigrationTest.java @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import static org.junit.Assert.fail; + +import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType; +import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests for checking whether {@link WindowOperator} can restore from snapshots that were done + * using the Flink 1.2 {@link WindowOperator}. + * + * <p>This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink 1.2 + * aligned processing-time windows operator. + * + * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on + * the Flink 1.2 branch. + */ +public class WindowOperatorFrom12MigrationTest { + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeSessionWindowsWithCountTriggerSnapshot() throws Exception { + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + PurgingTrigger.of(CountTrigger.of(4)), + 0, + null /* late data output tag */); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); + + // do snapshot and save to file + OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot"); + + testHarness.close(); + } + + @Test + public void testRestoreSessionWindowsWithCountTrigger() throws Exception { + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + PurgingTrigger.of(CountTrigger.of(4)), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot"))); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000)); + + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + // add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeSessionWindowsWithCountTriggerInMintConditionSnapshot() throws Exception { + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + PurgingTrigger.of(CountTrigger.of(4)), + 0, + null /* late data output tag */); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // do snapshot and save to file + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot"); + + testHarness.close(); + } + + /** + * This checks that we can restore from a virgin {@code WindowOperator} that has never seen + * any elements. + */ + @Test + public void testRestoreSessionWindowsWithCountTriggerInMintCondition() throws Exception { + + final int SESSION_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new SessionWindowFunction()), + PurgingTrigger.of(CountTrigger.of(4)), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot"))); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + // add an element that merges the two "key1" sessions, they should now have count 6, and therfore fire + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500)); + + expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L), 9999L)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); + + testHarness.close(); + } + + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeReducingEventTimeWindowsSnapshot() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + EventTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(new Watermark(999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(new Watermark(1999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + // do snapshot and save to file + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot"); + + testHarness.close(); + } + + @Test + public void testRestoreReducingEventTimeWindows() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + EventTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-reduce-event-time-flink1.2-snapshot"))); + testHarness.open(); + + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new Watermark(2999)); + + testHarness.processWatermark(new Watermark(3999)); + expectedOutput.add(new Watermark(3999)); + + testHarness.processWatermark(new Watermark(4999)); + expectedOutput.add(new Watermark(4999)); + + testHarness.processWatermark(new Watermark(5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999)); + expectedOutput.add(new Watermark(5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeApplyEventTimeWindowsSnapshot() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), + EventTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(new Watermark(999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(new Watermark(1999)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + // do snapshot and save to file + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot"); + + testHarness.close(); + } + + @Test + public void testRestoreApplyEventTimeWindows() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), + EventTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-apply-event-time-flink1.2-snapshot"))); + testHarness.open(); + + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new Watermark(2999)); + + testHarness.processWatermark(new Watermark(3999)); + expectedOutput.add(new Watermark(3999)); + + testHarness.processWatermark(new Watermark(4999)); + expectedOutput.add(new Watermark(4999)); + + testHarness.processWatermark(new Watermark(5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999)); + expectedOutput.add(new Watermark(5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeReducingProcessingTimeWindowsSnapshot() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(10); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + + testHarness.setProcessingTime(3010); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1))); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + // do snapshot and save to file + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot"); + + testHarness.close(); + + } + + @Test + public void testRestoreReducingProcessingTimeWindows() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-reduce-processing-time-flink1.2-snapshot"))); + testHarness.open(); + + testHarness.setProcessingTime(3020); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3))); + + testHarness.setProcessingTime(6000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeApplyProcessingTimeWindowsSnapshot() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(10); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1))); + + testHarness.setProcessingTime(3010); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1))); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + // do snapshot and save to file + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot"); + + testHarness.close(); + } + + @Test + public void testRestoreApplyProcessingTimeWindows() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-apply-processing-time-flink1.2-snapshot"))); + testHarness.open(); + + testHarness.setProcessingTime(3020); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3))); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3))); + + testHarness.setProcessingTime(6000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeAggregatingAlignedProcessingTimeWindowsSnapshot() throws Exception { + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + AggregatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>> operator = + new AggregatingProcessingTimeWindowOperator<>( + new ReduceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = -8913160567151867987L; + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<>(value1.f0, value1.f1 + value2.f1); + } + }, + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + inputType.createSerializer(new ExecutionConfig()), + 3000, + 3000); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + testHarness.setProcessingTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + // do a snapshot, close and restore again + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot"); + testHarness.close(); + } + + @Test + public void testRestoreAggregatingAlignedProcessingTimeWindows() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */, + LegacyWindowOperatorType.FAST_AGGREGATING); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-aggr-aligned-flink1.2-snapshot"))); + testHarness.open(); + + testHarness.setProcessingTime(5000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + testHarness.setProcessingTime(7000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.close(); + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeAlignedProcessingTimeWindowsSnapshot() throws Exception { + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>> operator = + new AccumulatingProcessingTimeWindowOperator<>( + new InternalIterableWindowFunction<>(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { + + private static final long serialVersionUID = 6551516443265733803L; + + @Override + public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { + int sum = 0; + for (Tuple2<String, Integer> anInput : input) { + sum += anInput.f1; + } + out.collect(new Tuple2<>(s, sum)); + } + }), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + inputType.createSerializer(new ExecutionConfig()), + 3000, + 3000); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + testHarness.setProcessingTime(3); + + // timestamp is ignored in processing time + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + // do a snapshot, close and restore again + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot"); + testHarness.close(); + } + + @Test + public void testRestoreAccumulatingAlignedProcessingTimeWindows() throws Exception { + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + ProcessingTimeTrigger.create(), + 0, + null /* late data output tag */, + LegacyWindowOperatorType.FAST_ACCUMULATING); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("win-op-migration-test-accum-aligned-flink1.2-snapshot"))); + testHarness.open(); + + testHarness.setProcessingTime(5000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); + + testHarness.setProcessingTime(7000); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); + + testHarness.close(); + } + + + private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + } + + @SuppressWarnings("unchecked") + private static class Tuple2ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark || o2 instanceof Watermark) { + return 0; + } else { + StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1; + StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2; + if (sr0.getTimestamp() != sr1.getTimestamp()) { + return (int) (sr0.getTimestamp() - sr1.getTimestamp()); + } + int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0); + if (comparison != 0) { + return comparison; + } else { + return sr0.getValue().f1 - sr1.getValue().f1; + } + } + } + } + + @SuppressWarnings("unchecked") + private static class Tuple3ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark || o2 instanceof Watermark) { + return 0; + } else { + StreamRecord<Tuple3<String, Long, Long>> sr0 = (StreamRecord<Tuple3<String, Long, Long>>) o1; + StreamRecord<Tuple3<String, Long, Long>> sr1 = (StreamRecord<Tuple3<String, Long, Long>>) o2; + if (sr0.getTimestamp() != sr1.getTimestamp()) { + return (int) (sr0.getTimestamp() - sr1.getTimestamp()); + } + int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0); + if (comparison != 0) { + return comparison; + } else { + comparison = (int) (sr0.getValue().f1 - sr1.getValue().f1); + if (comparison != 0) { + return comparison; + } + return (int) (sr0.getValue().f1 - sr1.getValue().f1); + } + } + } + } + + public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + } + } + + public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> { + private static final long serialVersionUID = 1L; + + private boolean openCalled = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + } + + @Override + public void apply(String key, + W window, + Iterable<Tuple2<String, Integer>> input, + Collector<Tuple2<String, Integer>> out) throws Exception { + + if (!openCalled) { + fail("Open was not called"); + } + int sum = 0; + + for (Tuple2<String, Integer> t: input) { + sum += t.f1; + } + out.collect(new Tuple2<>(key, sum)); + + } + + } + + public static class SessionWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String key, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple3<String, Long, Long>> out) throws Exception { + int sum = 0; + for (Tuple2<String, Integer> i: values) { + sum += i.f1; + } + String resultString = key + "-" + sum; + out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd())); + } + } +}