http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
deleted file mode 100644
index 1168eb0..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ /dev/null
@@ -1,896 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
-import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.fail;
-
-/**
- * Tests for checking whether {@link WindowOperator} can restore from 
snapshots that were done
- * using the Flink 1.1 {@link WindowOperator}.
- *
- * <p>
- * This also checks whether {@link WindowOperator} can restore from a 
checkpoint of the Flink 1.1
- * aligned processing-time windows operator.
- *
- * <p>For regenerating the binary snapshot file you have to run the commented 
out portion
- * of each test on a checkout of the Flink 1.1 branch.
- */
-public class WindowOperatorMigrationTest {
-
-       private static String getResourceFilename(String filename) {
-               ClassLoader cl = 
WindowOperatorMigrationTest.class.getClassLoader();
-               URL resource = cl.getResource(filename);
-               if (resource == null) {
-                       throw new NullPointerException("Missing snapshot 
resource.");
-               }
-               return resource.getFile();
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testRestoreSessionWindowsWithCountTriggerFromFlink11() 
throws Exception {
-
-               final int SESSION_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalIterableWindowFunction<>(new 
SessionWindowFunction()),
-                               PurgingTrigger.of(CountTrigger.of(4)),
-                               0,
-                               null /* late data output tag */);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               /*
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(operator);
-
-               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               // add elements out-of-order
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 0));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 2), 1000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), 3500));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 10));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
-
-               // do snapshot and save to file
-               StreamTaskState snapshot = testHarness.snapshot(0, 0);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot");
-               testHarness.close();
-        */
-
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-                               
"win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot"));
-               testHarness.open();
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 2500));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 6000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 6500));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 7000));
-
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
-
-               // add an element that merges the two "key1" sessions, they 
should now have count 6, and therfore fire
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 10), 4500));
-
-               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 
10L, 10000L), 9999L));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
-
-               testHarness.close();
-       }
-
-       /**
-        * This checks that we can restore from a virgin {@code WindowOperator} 
that has never seen
-        * any elements.
-        */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void 
testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws 
Exception {
-
-               final int SESSION_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> 
operator = new WindowOperator<>(
-                               
EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalIterableWindowFunction<>(new 
SessionWindowFunction()),
-                               PurgingTrigger.of(CountTrigger.of(4)),
-                               0,
-                               null /* late data output tag */);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               /*
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(operator);
-
-               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               // do snapshot and save to file
-               StreamTaskState snapshot = testHarness.snapshot(0, 0);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot");
-               testHarness.close();
-               */
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-                               
"win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot"));
-               testHarness.open();
-
-               // add elements out-of-order
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 0));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 2), 1000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 4), 3500));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 10));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 2500));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 6000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 6500));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3), 7000));
-
-               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 
0L, 6500L), 6499));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
-
-               // add an element that merges the two "key1" sessions, they 
should now have count 6, and therfore fire
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 10), 4500));
-
-               expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 
10L, 10000L), 9999L));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
-
-               testHarness.close();
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testRestoreReducingEventTimeWindowsFromFlink11() throws 
Exception {
-               final int WINDOW_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
-                               new SumReducer(),
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-                               EventTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               /*
-
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(operator);
-
-               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               // add elements out-of-order
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 3999));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 3000));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 20));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 0));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 999));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1998));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1999));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
-
-               testHarness.processWatermark(new Watermark(999));
-               expectedOutput.add(new Watermark(999));
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.processWatermark(new Watermark(1999));
-               expectedOutput.add(new Watermark(1999));
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               // do snapshot and save to file
-               StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot");
-               testHarness.close();
-
-               */
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-                               
"win-op-migration-test-reduce-event-time-flink1.1-snapshot"));
-               testHarness.open();
-
-               testHarness.processWatermark(new Watermark(2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
2999));
-               expectedOutput.add(new Watermark(2999));
-
-               testHarness.processWatermark(new Watermark(3999));
-               expectedOutput.add(new Watermark(3999));
-
-               testHarness.processWatermark(new Watermark(4999));
-               expectedOutput.add(new Watermark(4999));
-
-               testHarness.processWatermark(new Watermark(5999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
5999));
-               expectedOutput.add(new Watermark(5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testRestoreApplyEventTimeWindowsFromFlink11() throws 
Exception {
-               final int WINDOW_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalIterableWindowFunction<>(new 
RichSumReducer<TimeWindow>()),
-                               EventTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               /*
-
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(operator);
-
-               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               // add elements out-of-order
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 3999));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 3000));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 20));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 0));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 999));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1998));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1999));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
-
-               testHarness.processWatermark(new Watermark(999));
-               expectedOutput.add(new Watermark(999));
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.processWatermark(new Watermark(1999));
-               expectedOutput.add(new Watermark(1999));
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               // do snapshot and save to file
-               StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot");
-               testHarness.close();
-
-               */
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-                               
"win-op-migration-test-apply-event-time-flink1.1-snapshot"));
-               testHarness.open();
-
-               testHarness.processWatermark(new Watermark(2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
2999));
-               expectedOutput.add(new Watermark(2999));
-
-               testHarness.processWatermark(new Watermark(3999));
-               expectedOutput.add(new Watermark(3999));
-
-               testHarness.processWatermark(new Watermark(4999));
-               expectedOutput.add(new Watermark(4999));
-
-               testHarness.processWatermark(new Watermark(5999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
5999));
-               expectedOutput.add(new Watermark(5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testRestoreReducingProcessingTimeWindowsFromFlink11() 
throws Exception {
-               final int WINDOW_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
-                               new SumReducer(),
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-                               ProcessingTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               /*
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-               TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
timeServiceProvider);
-
-               testHarness.configureForKeyedStream(new 
WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               timeServiceProvider.setCurrentTime(10);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1)));
-
-               timeServiceProvider.setCurrentTime(3010);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key3", 1)));
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 
2999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
WindowOperatorTest.Tuple2ResultSortComparator());
-
-               // do snapshot and save to file
-               StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot");
-               testHarness.close();
-               */
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-                               
"win-op-migration-test-reduce-processing-time-flink1.1-snapshot"));
-               testHarness.open();
-
-               testHarness.setProcessingTime(3020);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3)));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3)));
-
-               testHarness.setProcessingTime(6000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
5999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
5999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 
5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws 
Exception {
-               final int WINDOW_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> 
operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalIterableWindowFunction<>(new 
RichSumReducer<TimeWindow>()),
-                               ProcessingTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               /*
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-               TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
timeServiceProvider);
-
-               testHarness.configureForKeyedStream(new 
WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               timeServiceProvider.setCurrentTime(10);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1)));
-
-               timeServiceProvider.setCurrentTime(3010);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1)));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key3", 1)));
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 
2999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
WindowOperatorTest.Tuple2ResultSortComparator());
-
-               // do snapshot and save to file
-               StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot");
-               testHarness.close();
-               */
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-                               
"win-op-migration-test-apply-processing-time-flink1.1-snapshot"));
-               testHarness.open();
-
-               testHarness.setProcessingTime(3020);
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3)));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 3)));
-
-               testHarness.setProcessingTime(6000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
5999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
5999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 
5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       public void 
testRestoreAggregatingAlignedProcessingTimeWindowsFromFlink11() throws 
Exception {
-               /*
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               AggregatingProcessingTimeWindowOperator<String, Tuple2<String, 
Integer>> operator =
-                       new AggregatingProcessingTimeWindowOperator<>(
-                               new ReduceFunction<Tuple2<String, Integer>>() {
-                                       private static final long 
serialVersionUID = -8913160567151867987L;
-
-                                       @Override
-                                       public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws 
Exception {
-                                               return new Tuple2<>(value1.f0, 
value1.f1 + value2.f1);
-                                       }
-                               },
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               inputType.createSerializer(new 
ExecutionConfig()),
-                               3000,
-                               3000);
-
-               TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new OneInputStreamOperatorTestHarness<>(operator, new 
ExecutionConfig(), testTimeProvider);
-
-               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               testHarness.open();
-
-               testTimeProvider.setCurrentTime(3);
-
-               // timestamp is ignored in processing time
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), Long.MAX_VALUE));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               // do a snapshot, close and restore again
-               StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot");
-               testHarness.close();
-
-               */
-
-               final int WINDOW_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
-                               new SumReducer(),
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-                               ProcessingTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */,
-                               LegacyWindowOperatorType.FAST_AGGREGATING);
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot");
-               testHarness.open();
-
-               testHarness.setProcessingTime(5000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
2999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               testHarness.setProcessingTime(7000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void 
testRestoreAccumulatingAlignedProcessingTimeWindowsFromFlink11() throws 
Exception {
-               /*
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, 
Integer>, Tuple2<String, Integer>> operator =
-                       new AccumulatingProcessingTimeWindowOperator<>(
-                               new WindowFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
-
-                                       private static final long 
serialVersionUID = 6551516443265733803L;
-
-                                       @Override
-                                       public void apply(String s, TimeWindow 
window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, 
Integer>> out) throws Exception {
-                                               int sum = 0;
-                                               for (Tuple2<String, Integer> 
anInput : input) {
-                                                       sum += anInput.f1;
-                                               }
-                                               out.collect(new Tuple2<>(s, 
sum));
-                                       }
-                               },
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               inputType.createSerializer(new 
ExecutionConfig()),
-                               3000,
-                               3000);
-
-               TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                       new OneInputStreamOperatorTestHarness<>(operator, new 
ExecutionConfig(), testTimeProvider);
-
-               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               testHarness.open();
-
-               testTimeProvider.setCurrentTime(3);
-
-               // timestamp is ignored in processing time
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), Long.MAX_VALUE));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               // do a snapshot, close and restore again
-               StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.snaphotToFile(snapshot, 
"src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot");
-               testHarness.close();
-
-               */
-               final int WINDOW_SIZE = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
-                               new SumReducer(),
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-                               ProcessingTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */,
-                               LegacyWindowOperatorType.FAST_ACCUMULATING);
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               testHarness.setup();
-               
testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot");
-               testHarness.open();
-
-               testHarness.setProcessingTime(5000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
2999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               testHarness.setProcessingTime(7000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.close();
-       }
-
-
-       private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String getKey(Tuple2<String, Integer> value) throws 
Exception {
-                       return value.f0;
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       private static class Tuple2ResultSortComparator implements 
Comparator<Object> {
-               @Override
-               public int compare(Object o1, Object o2) {
-                       if (o1 instanceof Watermark || o2 instanceof Watermark) 
{
-                               return 0;
-                       } else {
-                               StreamRecord<Tuple2<String, Integer>> sr0 = 
(StreamRecord<Tuple2<String, Integer>>) o1;
-                               StreamRecord<Tuple2<String, Integer>> sr1 = 
(StreamRecord<Tuple2<String, Integer>>) o2;
-                               if (sr0.getTimestamp() != sr1.getTimestamp()) {
-                                       return (int) (sr0.getTimestamp() - 
sr1.getTimestamp());
-                               }
-                               int comparison = 
sr0.getValue().f0.compareTo(sr1.getValue().f0);
-                               if (comparison != 0) {
-                                       return comparison;
-                               } else {
-                                       return sr0.getValue().f1 - 
sr1.getValue().f1;
-                               }
-                       }
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       private static class Tuple3ResultSortComparator implements 
Comparator<Object> {
-               @Override
-               public int compare(Object o1, Object o2) {
-                       if (o1 instanceof Watermark || o2 instanceof Watermark) 
{
-                               return 0;
-                       } else {
-                               StreamRecord<Tuple3<String, Long, Long>> sr0 = 
(StreamRecord<Tuple3<String, Long, Long>>) o1;
-                               StreamRecord<Tuple3<String, Long, Long>> sr1 = 
(StreamRecord<Tuple3<String, Long, Long>>) o2;
-                               if (sr0.getTimestamp() != sr1.getTimestamp()) {
-                                       return (int) (sr0.getTimestamp() - 
sr1.getTimestamp());
-                               }
-                               int comparison = 
sr0.getValue().f0.compareTo(sr1.getValue().f0);
-                               if (comparison != 0) {
-                                       return comparison;
-                               } else {
-                                       comparison = (int) (sr0.getValue().f1 - 
sr1.getValue().f1);
-                                       if (comparison != 0) {
-                                               return comparison;
-                                       }
-                                       return (int) (sr0.getValue().f2 - 
sr1.getValue().f2);
-                               }
-                       }
-               }
-       }
-
-       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
-                               Tuple2<String, Integer> value2) throws 
Exception {
-                       return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
-               }
-       }
-
-       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> 
{
-               private static final long serialVersionUID = 1L;
-
-               private boolean openCalled = false;
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-                       openCalled = true;
-               }
-
-               @Override
-               public void close() throws Exception {
-                       super.close();
-               }
-
-               @Override
-               public void apply(String key,
-                               W window,
-                               Iterable<Tuple2<String, Integer>> input,
-                               Collector<Tuple2<String, Integer>> out) throws 
Exception {
-
-                       if (!openCalled) {
-                               fail("Open was not called");
-                       }
-                       int sum = 0;
-
-                       for (Tuple2<String, Integer> t: input) {
-                               sum += t.f1;
-                       }
-                       out.collect(new Tuple2<>(key, sum));
-
-               }
-
-       }
-
-       public static class SessionWindowFunction implements 
WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, 
TimeWindow> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void apply(String key,
-                               TimeWindow window,
-                               Iterable<Tuple2<String, Integer>> values,
-                               Collector<Tuple3<String, Long, Long>> out) 
throws Exception {
-                       int sum = 0;
-                       for (Tuple2<String, Integer> i: values) {
-                               sum += i.f1;
-                       }
-                       String resultString = key + "-" + sum;
-                       out.collect(new Tuple3<>(resultString, 
window.getStart(), window.getEnd()));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
new file mode 100644
index 0000000..d182bee
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
new file mode 100644
index 0000000..83741c1
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
new file mode 100644
index 0000000..3411df6
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
new file mode 100644
index 0000000..060397f
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
new file mode 100644
index 0000000..dad4630
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
new file mode 100644
index 0000000..0edf25e
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
new file mode 100644
index 0000000..d0afbe0
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
 
b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
new file mode 100644
index 0000000..7d0b6cc
Binary files /dev/null and 
b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
 differ

Reply via email to