http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index 9f19064..d7df479 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
 import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
@@ -84,7 +83,7 @@ public class WindowOperatorMigrationTest {
 
        @Parameterized.Parameters(name = "Migration Savepoint: {0}")
        public static Collection<MigrationVersion> parameters () {
-               return Arrays.asList(MigrationVersion.v1_1, 
MigrationVersion.v1_2, MigrationVersion.v1_3);
+               return Arrays.asList(MigrationVersion.v1_2, 
MigrationVersion.v1_3);
        }
 
        /**
@@ -753,219 +752,6 @@ public class WindowOperatorMigrationTest {
                testHarness.close();
        }
 
-       /**
-        * Manually run this to write binary snapshot data.
-        */
-       @Ignore
-       @Test
-       public void writeAggregatingAlignedProcessingTimeWindowsSnapshot() 
throws Exception {
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               AggregatingProcessingTimeWindowOperator<String, Tuple2<String, 
Integer>> operator =
-                       new AggregatingProcessingTimeWindowOperator<>(
-                               new ReduceFunction<Tuple2<String, Integer>>() {
-                                       private static final long 
serialVersionUID = -8913160567151867987L;
-
-                                       @Override
-                                       public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws 
Exception {
-                                               return new Tuple2<>(value1.f0, 
value1.f1 + value2.f1);
-                                       }
-                               },
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               inputType.createSerializer(new 
ExecutionConfig()),
-                               3000,
-                               3000);
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.open();
-
-               testHarness.setProcessingTime(3);
-
-               // timestamp is ignored in processing time
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), Long.MAX_VALUE));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-               OperatorSnapshotUtil.writeStateHandle(
-                       snapshot,
-                       
"src/test/resources/win-op-migration-test-aggr-aligned-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
-               testHarness.close();
-       }
-
-       @Test
-       public void testRestoreAggregatingAlignedProcessingTimeWindows() throws 
Exception {
-               final int windowSize = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
-                               new SumReducer(),
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-                               ProcessingTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */,
-                               LegacyWindowOperatorType.FAST_AGGREGATING);
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               testHarness.setup();
-
-               MigrationTestUtil.restoreFromSnapshot(
-                       testHarness,
-                       OperatorSnapshotUtil.getResourceFilename(
-                               "win-op-migration-test-aggr-aligned-flink" + 
testMigrateVersion + "-snapshot"),
-                       testMigrateVersion);
-
-               testHarness.open();
-
-               testHarness.setProcessingTime(5000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
2999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               testHarness.setProcessingTime(7000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.close();
-       }
-
-       /**
-        * Manually run this to write binary snapshot data.
-        */
-       @Ignore
-       @Test
-       public void writeAlignedProcessingTimeWindowsSnapshot() throws 
Exception {
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, 
Integer>, Tuple2<String, Integer>> operator =
-                       new AccumulatingProcessingTimeWindowOperator<>(
-                                       new 
InternalIterableWindowFunction<>(new WindowFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>, String, TimeWindow>() {
-
-                                               private static final long 
serialVersionUID = 6551516443265733803L;
-
-                                               @Override
-                                               public void apply(String s, 
TimeWindow window, Iterable<Tuple2<String, Integer>> input, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                                                       int sum = 0;
-                                                       for (Tuple2<String, 
Integer> anInput : input) {
-                                                               sum += 
anInput.f1;
-                                                       }
-                                                       out.collect(new 
Tuple2<>(s, sum));
-                                               }
-                                       }),
-                                       new TupleKeySelector(),
-                                       
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                                       inputType.createSerializer(new 
ExecutionConfig()),
-                                       3000,
-                                       3000);
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.open();
-
-               testHarness.setProcessingTime(3);
-
-               // timestamp is ignored in processing time
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), Long.MAX_VALUE));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 7000));
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-               OperatorSnapshotUtil.writeStateHandle(
-                       snapshot,
-                       
"src/test/resources/win-op-migration-test-accum-aligned-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
-               testHarness.close();
-       }
-
-       @Test
-       public void testRestoreAccumulatingAlignedProcessingTimeWindows() 
throws Exception {
-               final int windowSize = 3;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
-                               new SumReducer(),
-                               inputType.createSerializer(new 
ExecutionConfig()));
-
-               WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-                               
TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
-                               new TimeWindow.Serializer(),
-                               new TupleKeySelector(),
-                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               stateDesc,
-                               new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
-                               ProcessingTimeTrigger.create(),
-                               0,
-                               null /* late data output tag */,
-                               LegacyWindowOperatorType.FAST_ACCUMULATING);
-
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               testHarness.setup();
-
-               MigrationTestUtil.restoreFromSnapshot(
-                       testHarness,
-                       OperatorSnapshotUtil.getResourceFilename(
-                               "win-op-migration-test-accum-aligned-flink" + 
testMigrateVersion + "-snapshot"),
-                       testMigrateVersion);
-
-               testHarness.open();
-
-               testHarness.setProcessingTime(5000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 
2999));
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
2999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), 7000));
-
-               testHarness.setProcessingTime(7000);
-
-               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 
5999));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
-
-               testHarness.close();
-       }
-
        private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 8748ed4..821438e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -63,7 +63,6 @@ import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -336,32 +335,6 @@ public class WindowTranslationTest {
                processElementAndEnsureOutput(winOperator, 
winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new 
Tuple2<>("hello", 1));
        }
 
-
-       /**
-        * Ignored because we currently don't have the fast processing-time 
window operator.
-        */
-       @Test
-       @SuppressWarnings("rawtypes")
-       @Ignore
-       public void testReduceFastProcessingTime() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-               DataStream<Tuple2<String, Integer>> window = source
-                               .keyBy(new TupleKeySelector())
-                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
-                               .reduce(new DummyReducer());
-
-               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform =
-                               (OneInputTransformation<Tuple2<String, 
Integer>, Tuple2<String, Integer>>) window.getTransformation();
-               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator = transform.getOperator();
-               Assert.assertTrue(operator instanceof 
AggregatingProcessingTimeWindowOperator);
-
-               processElementAndEnsureOutput(operator, null, 
BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
-       }
-
        @Test
        @SuppressWarnings("rawtypes")
        public void testReduceWithWindowFunctionEventTime() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 14ae733..f73499c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -64,7 +64,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -75,7 +74,6 @@ import org.junit.Test;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
@@ -106,12 +104,6 @@ public class InterruptSensitiveRestoreTest {
        private static final int OPERATOR_RAW = 1;
        private static final int KEYED_MANAGED = 2;
        private static final int KEYED_RAW = 3;
-       private static final int LEGACY = 4;
-
-       @Test
-       public void testRestoreWithInterruptLegacy() throws Exception {
-               testRestoreWithInterrupt(LEGACY);
-       }
 
        @Test
        public void testRestoreWithInterruptOperatorManaged() throws Exception {
@@ -137,18 +129,15 @@ public class InterruptSensitiveRestoreTest {
 
                IN_RESTORE_LATCH.reset();
                Configuration taskConfig = new Configuration();
-               StreamConfig streamConfig = new StreamConfig(taskConfig);
-               
streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+               StreamConfig cfg = new StreamConfig(taskConfig);
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                switch (mode) {
                        case OPERATOR_MANAGED:
                        case OPERATOR_RAW:
                        case KEYED_MANAGED:
                        case KEYED_RAW:
-                               
streamConfig.setStateKeySerializer(IntSerializer.INSTANCE);
-                               streamConfig.setStreamOperator(new 
StreamSource<>(new TestSource()));
-                               break;
-                       case LEGACY:
-                               streamConfig.setStreamOperator(new 
StreamSource<>(new TestSourceLegacy()));
+                               
cfg.setStateKeySerializer(IntSerializer.INSTANCE);
+                               cfg.setStreamOperator(new StreamSource<>(new 
TestSource(mode)));
                                break;
                        default:
                                throw new IllegalArgumentException();
@@ -156,7 +145,7 @@ public class InterruptSensitiveRestoreTest {
 
                StreamStateHandle lockingHandle = new 
InterruptLockingStateHandle();
 
-               Task task = createTask(streamConfig, taskConfig, lockingHandle, 
mode);
+               Task task = createTask(cfg, taskConfig, lockingHandle, mode);
 
                // start the task and wait until it is in "restore"
                task.startTaskThread();
@@ -180,16 +169,15 @@ public class InterruptSensitiveRestoreTest {
        // 
------------------------------------------------------------------------
 
        private static Task createTask(
-               StreamConfig streamConfig,
-               Configuration taskConfig,
-               StreamStateHandle state,
-               int mode) throws IOException {
+                       StreamConfig streamConfig,
+                       Configuration taskConfig,
+                       StreamStateHandle state,
+                       int mode) throws IOException {
 
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
-               StreamStateHandle operatorState = null;
                Collection<KeyedStateHandle> keyedStateFromBackend = 
Collections.emptyList();
                Collection<KeyedStateHandle> keyedStateFromStream = 
Collections.emptyList();
                Collection<OperatorStateHandle> operatorStateBackend = 
Collections.emptyList();
@@ -206,7 +194,7 @@ public class InterruptSensitiveRestoreTest {
                                Collections.singletonList(new 
OperatorStateHandle(operatorStateMetadata, state));
 
                List<KeyedStateHandle> keyedStateHandles =
-                               Collections.<KeyedStateHandle>singletonList(new 
KeyGroupsStateHandle(keyGroupRangeOffsets, state));
+                               Collections.singletonList(new 
KeyGroupsStateHandle(keyGroupRangeOffsets, state));
 
                switch (mode) {
                        case OPERATOR_MANAGED:
@@ -221,15 +209,11 @@ public class InterruptSensitiveRestoreTest {
                        case KEYED_RAW:
                                keyedStateFromStream = keyedStateHandles;
                                break;
-                       case LEGACY:
-                               operatorState = state;
-                               break;
                        default:
                                throw new IllegalArgumentException();
                }
 
                OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(
-                       operatorState,
                        operatorStateBackend,
                        operatorStateStream,
                        keyedStateFromBackend,
@@ -238,14 +222,13 @@ public class InterruptSensitiveRestoreTest {
                JobVertexID jobVertexID = new JobVertexID();
                OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
                streamConfig.setOperatorID(operatorID);
-
                TaskStateSnapshot stateSnapshot = new TaskStateSnapshot();
                stateSnapshot.putSubtaskStateByOperatorID(operatorID, 
operatorSubtaskState);
                JobInformation jobInformation = new JobInformation(
                        new JobID(),
                        "test job name",
                        new SerializedValue<>(new ExecutionConfig()),
-                       taskConfig,
+                       new Configuration(),
                        Collections.<BlobKey>emptyList(),
                        Collections.<URL>emptyList());
 
@@ -302,11 +285,11 @@ public class InterruptSensitiveRestoreTest {
                        FSDataInputStream is = new FSDataInputStream() {
 
                                @Override
-                               public void seek(long desired) throws 
IOException {
+                               public void seek(long desired) {
                                }
 
                                @Override
-                               public long getPos() throws IOException {
+                               public long getPos() {
                                        return 0;
                                }
 
@@ -358,33 +341,15 @@ public class InterruptSensitiveRestoreTest {
 
        // 
------------------------------------------------------------------------
 
-       private static class TestSourceLegacy implements 
SourceFunction<Object>, Checkpointed<Serializable> {
+       private static class TestSource implements SourceFunction<Object>, 
CheckpointedFunction {
                private static final long serialVersionUID = 1L;
+               private final int testType;
 
-               @Override
-               public void run(SourceContext<Object> ctx) throws Exception {
-                       fail("should never be called");
+               public TestSource(int testType) {
+                       this.testType = testType;
                }
 
                @Override
-               public void cancel() {}
-
-               @Override
-               public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       fail("should never be called");
-                       return null;
-               }
-
-               @Override
-               public void restoreState(Serializable state) throws Exception {
-                       fail("should never be called");
-               }
-       }
-
-       private static class TestSource implements SourceFunction<Object>, 
CheckpointedFunction {
-               private static final long serialVersionUID = 1L;
-
-               @Override
                public void run(SourceContext<Object> ctx) throws Exception {
                        fail("should never be called");
                }
@@ -399,6 +364,8 @@ public class InterruptSensitiveRestoreTest {
 
                @Override
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
+                       // raw keyed state is already read by timer service, 
all others to initialize the context...we only need to
+                       // trigger this manually.
                        ((StateInitializationContext) 
context).getRawOperatorStateInputs().iterator().next().getStream().read();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 3190620..8d80d66 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -28,8 +28,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -47,21 +45,18 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -69,7 +64,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -78,7 +72,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -93,7 +86,7 @@ import static org.junit.Assert.fail;
 public class OneInputStreamTaskTest extends TestLogger {
 
        private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR =
-                       new ListStateDescriptor<>("test", new IntSerializer());
+               new ListStateDescriptor<>("test", new IntSerializer());
 
        /**
         * This test verifies that open() and close() are correctly called. 
This test also verifies
@@ -129,8 +122,8 @@ public class OneInputStreamTaskTest extends TestLogger {
                assertTrue("RichFunction methods where not called.", 
TestOpenCloseMapFunction.closeCalled);
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.",
-                               expectedOutput,
-                               testHarness.getOutput());
+                       expectedOutput,
+                       testHarness.getOutput());
        }
 
        /**
@@ -174,8 +167,8 @@ public class OneInputStreamTaskTest extends TestLogger {
                testHarness.waitForInputProcessing();
                expectedOutput.add(new Watermark(initialTime));
                TestHarnessUtil.assertOutputEquals("Output was not correct.",
-                               expectedOutput,
-                               testHarness.getOutput());
+                       expectedOutput,
+                       testHarness.getOutput());
 
                // contrary to checkpoint barriers these elements are not 
blocked by watermarks
                testHarness.processElement(new StreamRecord<String>("Hello", 
initialTime));
@@ -215,7 +208,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                testHarness.processElement(new Watermark(initialTime + 6), 0, 
0);
                testHarness.processElement(new Watermark(initialTime + 5), 1, 
1); // this watermark should be advanced first
                testHarness.processElement(StreamStatus.IDLE, 1, 1); // once 
this is acknowledged,
-                                                                    // 
watermark (initial + 6) should be forwarded
+               // watermark (initial + 6) should be forwarded
                testHarness.waitForInputProcessing();
                expectedOutput.add(new Watermark(initialTime + 5));
                expectedOutput.add(new Watermark(initialTime + 6));
@@ -263,21 +256,16 @@ public class OneInputStreamTaskTest extends TestLogger {
                // ------------------ setup the chain ------------------
 
                TriggerableFailOnWatermarkTestOperator headOperator = new 
TriggerableFailOnWatermarkTestOperator();
-               OperatorID headOperatorId = new OperatorID();
-
                StreamConfig headOperatorConfig = testHarness.getStreamConfig();
 
                WatermarkGeneratingTestOperator watermarkOperator = new 
WatermarkGeneratingTestOperator();
-               OperatorID watermarkOperatorId = new OperatorID();
-
                StreamConfig watermarkOperatorConfig = new StreamConfig(new 
Configuration());
 
                TriggerableFailOnWatermarkTestOperator tailOperator = new 
TriggerableFailOnWatermarkTestOperator();
-               OperatorID tailOperatorId = new OperatorID();
                StreamConfig tailOperatorConfig = new StreamConfig(new 
Configuration());
 
                headOperatorConfig.setStreamOperator(headOperator);
-               headOperatorConfig.setOperatorID(headOperatorId);
+               headOperatorConfig.setOperatorID(new OperatorID(42L, 42L));
                headOperatorConfig.setChainStart();
                headOperatorConfig.setChainIndex(0);
                
headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
@@ -290,7 +278,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                )));
 
                watermarkOperatorConfig.setStreamOperator(watermarkOperator);
-               watermarkOperatorConfig.setOperatorID(watermarkOperatorId);
+               watermarkOperatorConfig.setOperatorID(new OperatorID(4711L, 
42L));
                
watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
                watermarkOperatorConfig.setChainIndex(1);
                
watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new 
StreamEdge(
@@ -312,7 +300,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                        null));
 
                tailOperatorConfig.setStreamOperator(tailOperator);
-               tailOperatorConfig.setOperatorID(tailOperatorId);
+               tailOperatorConfig.setOperatorID(new OperatorID(123L, 123L));
                
tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
                tailOperatorConfig.setBufferTimeout(0);
                tailOperatorConfig.setChainIndex(2);
@@ -555,13 +543,11 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                long checkpointId = 1L;
                long checkpointTimestamp = 1L;
-               long recoveryTimestamp = 3L;
-               long seed = 2L;
                int numberChainedTasks = 11;
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
 
-               configureChainedTestingStreamOperator(streamConfig, 
numberChainedTasks, seed, recoveryTimestamp);
+               configureChainedTestingStreamOperator(streamConfig, 
numberChainedTasks);
 
                AcknowledgeStreamMockEnvironment env = new 
AcknowledgeStreamMockEnvironment(
                        testHarness.jobConfig,
@@ -599,7 +585,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                StreamConfig restoredTaskStreamConfig = 
restoredTaskHarness.getStreamConfig();
 
-               configureChainedTestingStreamOperator(restoredTaskStreamConfig, 
numberChainedTasks, seed, recoveryTimestamp);
+               configureChainedTestingStreamOperator(restoredTaskStreamConfig, 
numberChainedTasks);
 
                TaskStateSnapshot stateHandles = 
env.getCheckpointStateHandles();
                Assert.assertEquals(numberChainedTasks, 
stateHandles.getSubtaskStateMappings().size());
@@ -625,16 +611,12 @@ public class OneInputStreamTaskTest extends TestLogger {
 
        private void configureChainedTestingStreamOperator(
                StreamConfig streamConfig,
-               int numberChainedTasks,
-               long seed,
-               long recoveryTimestamp) {
+               int numberChainedTasks) {
 
                Preconditions.checkArgument(numberChainedTasks >= 1, "The 
operator chain must at least " +
                        "contain one operator.");
 
-               Random random = new Random(seed);
-
-               TestingStreamOperator<Integer, Integer> previousOperator = new 
TestingStreamOperator<>(random.nextLong(), recoveryTimestamp);
+               TestingStreamOperator<Integer, Integer> previousOperator = new 
TestingStreamOperator<>();
                streamConfig.setStreamOperator(previousOperator);
                streamConfig.setOperatorID(new OperatorID(0L, 0L));
 
@@ -643,7 +625,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                List<StreamEdge> outputEdges = new 
ArrayList<>(numberChainedTasks - 1);
 
                for (int chainedIndex = 1; chainedIndex < numberChainedTasks; 
chainedIndex++) {
-                       TestingStreamOperator<Integer, Integer> chainedOperator 
= new TestingStreamOperator<>(random.nextLong(), recoveryTimestamp);
+                       TestingStreamOperator<Integer, Integer> chainedOperator 
= new TestingStreamOperator<>();
                        StreamConfig chainedConfig = new StreamConfig(new 
Configuration());
                        chainedConfig.setStreamOperator(chainedOperator);
                        chainedConfig.setOperatorID(new OperatorID(0L, 
chainedIndex));
@@ -702,17 +684,17 @@ public class OneInputStreamTaskTest extends TestLogger {
                }
 
                AcknowledgeStreamMockEnvironment(
-                               Configuration jobConfig, Configuration 
taskConfig,
-                               ExecutionConfig executionConfig, long 
memorySize,
-                               MockInputSplitProvider inputSplitProvider, int 
bufferSize) {
+                       Configuration jobConfig, Configuration taskConfig,
+                       ExecutionConfig executionConfig, long memorySize,
+                       MockInputSplitProvider inputSplitProvider, int 
bufferSize) {
                        super(jobConfig, taskConfig, executionConfig, 
memorySize, inputSplitProvider, bufferSize);
                }
 
                @Override
                public void acknowledgeCheckpoint(
-                               long checkpointId,
-                               CheckpointMetrics checkpointMetrics,
-                               TaskStateSnapshot checkpointStateHandles) {
+                       long checkpointId,
+                       CheckpointMetrics checkpointMetrics,
+                       TaskStateSnapshot checkpointStateHandles) {
 
                        this.checkpointId = checkpointId;
                        this.checkpointStateHandles = checkpointStateHandles;
@@ -729,19 +711,14 @@ public class OneInputStreamTaskTest extends TestLogger {
        }
 
        private static class TestingStreamOperator<IN, OUT>
-                       extends AbstractStreamOperator<OUT>
-                       implements OneInputStreamOperator<IN, OUT>, 
StreamCheckpointedOperator {
+               extends AbstractStreamOperator<OUT>
+               implements OneInputStreamOperator<IN, OUT> {
 
                private static final long serialVersionUID = 
774614855940397174L;
 
                public static int numberRestoreCalls = 0;
                public static int numberSnapshotCalls = 0;
 
-               private final long seed;
-               private final long recoveryTimestamp;
-
-               private transient Random random;
-
                @Override
                public void open() throws Exception {
                        super.open();
@@ -767,7 +744,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                @Override
                public void snapshotState(StateSnapshotContext context) throws 
Exception {
                        ListState<Integer> partitionableState =
-                                       
getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
+                               
getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
                        partitionableState.clear();
 
                        partitionableState.add(42);
@@ -778,59 +755,14 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                @Override
                public void initializeState(StateInitializationContext context) 
throws Exception {
-
-               }
-
-               TestingStreamOperator(long seed, long recoveryTimestamp) {
-                       this.seed = seed;
-                       this.recoveryTimestamp = recoveryTimestamp;
-               }
-
-               @Override
-               public void processElement(StreamRecord<IN> element) throws 
Exception {
-
-               }
-
-               @Override
-               public void snapshotState(FSDataOutputStream out, long 
checkpointId, long timestamp) throws Exception {
-                       if (random == null) {
-                               random = new Random(seed);
+                       if (context.isRestored()) {
+                               ++numberRestoreCalls;
                        }
-
-                       Serializable functionState = generateFunctionState();
-                       Integer operatorState = generateOperatorState();
-
-                       InstantiationUtil.serializeObject(out, functionState);
-                       InstantiationUtil.serializeObject(out, operatorState);
                }
 
                @Override
-               public void restoreState(FSDataInputStream in) throws Exception 
{
-                       numberRestoreCalls++;
-
-                       if (random == null) {
-                               random = new Random(seed);
-                       }
-
-                       assertEquals(this.recoveryTimestamp, recoveryTimestamp);
-
-                       assertNotNull(in);
-
-                       ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
-
-                       Serializable functionState = 
InstantiationUtil.deserializeObject(in, cl);
-                       Integer operatorState = 
InstantiationUtil.deserializeObject(in, cl);
-
-                       assertEquals(random.nextInt(), functionState);
-                       assertEquals(random.nextInt(), (int) operatorState);
-               }
-
-               private Serializable generateFunctionState() {
-                       return random.nextInt();
-               }
+               public void processElement(StreamRecord<IN> element) throws 
Exception {
 
-               private Integer generateOperatorState() {
-                       return random.nextInt();
                }
        }
 
@@ -913,8 +845,8 @@ public class OneInputStreamTaskTest extends TestLogger {
         * <p>If it receives a watermark when it's not expecting one, it'll 
throw an exception and fail.
         */
        private static class TriggerableFailOnWatermarkTestOperator
-                       extends AbstractStreamOperator<String>
-                       implements OneInputStreamOperator<String, String> {
+               extends AbstractStreamOperator<String>
+               implements OneInputStreamOperator<String, String> {
 
                private static final long serialVersionUID = 
2048954179291813243L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 08c3207..a2dc6c4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -81,7 +81,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -139,7 +138,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
@@ -307,9 +305,9 @@ public class StreamTaskTest extends TestLogger {
                streamTask.setEnvironment(mockEnvironment);
 
                // mock the operators
-               StreamOperator<?> streamOperator1 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-               StreamOperator<?> streamOperator2 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-               StreamOperator<?> streamOperator3 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+               StreamOperator<?> streamOperator1 = mock(StreamOperator.class);
+               StreamOperator<?> streamOperator2 = mock(StreamOperator.class);
+               StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
                // mock the returned snapshots
                OperatorSnapshotResult operatorSnapshotResult1 = 
mock(OperatorSnapshotResult.class);
@@ -321,15 +319,6 @@ public class StreamTaskTest extends TestLogger {
                when(streamOperator2.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
                when(streamOperator3.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenThrow(testException);
 
-               // mock the returned legacy snapshots
-               StreamStateHandle streamStateHandle1 = 
mock(StreamStateHandle.class);
-               StreamStateHandle streamStateHandle2 = 
mock(StreamStateHandle.class);
-               StreamStateHandle streamStateHandle3 = 
mock(StreamStateHandle.class);
-
-               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
-               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
-               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
-
                OperatorID operatorID1 = new OperatorID();
                OperatorID operatorID2 = new OperatorID();
                OperatorID operatorID3 = new OperatorID();
@@ -359,10 +348,6 @@ public class StreamTaskTest extends TestLogger {
 
                verify(operatorSnapshotResult1).cancel();
                verify(operatorSnapshotResult2).cancel();
-
-               verify(streamStateHandle1).discardState();
-               verify(streamStateHandle2).discardState();
-               verify(streamStateHandle3).discardState();
        }
 
        /**
@@ -384,12 +369,12 @@ public class StreamTaskTest extends TestLogger {
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
                streamTask.setEnvironment(mockEnvironment);
 
-               StreamOperator<?> streamOperator1 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-               StreamOperator<?> streamOperator2 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-               StreamOperator<?> streamOperator3 = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
-
-               // mock the new state handles / futures
+               // mock the operators
+               StreamOperator<?> streamOperator1 = mock(StreamOperator.class);
+               StreamOperator<?> streamOperator2 = mock(StreamOperator.class);
+               StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
+               // mock the new state operator snapshots
                OperatorSnapshotResult operatorSnapshotResult1 = 
mock(OperatorSnapshotResult.class);
                OperatorSnapshotResult operatorSnapshotResult2 = 
mock(OperatorSnapshotResult.class);
                OperatorSnapshotResult operatorSnapshotResult3 = 
mock(OperatorSnapshotResult.class);
@@ -403,15 +388,6 @@ public class StreamTaskTest extends TestLogger {
                when(streamOperator2.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
                when(streamOperator3.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
 
-               // mock the legacy state snapshot
-               StreamStateHandle streamStateHandle1 = 
mock(StreamStateHandle.class);
-               StreamStateHandle streamStateHandle2 = 
mock(StreamStateHandle.class);
-               StreamStateHandle streamStateHandle3 = 
mock(StreamStateHandle.class);
-
-               when(streamOperator1.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
-               when(streamOperator2.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
-               when(streamOperator3.snapshotLegacyOperatorState(anyLong(), 
anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
-
                OperatorID operatorID1 = new OperatorID();
                OperatorID operatorID2 = new OperatorID();
                OperatorID operatorID3 = new OperatorID();
@@ -438,10 +414,6 @@ public class StreamTaskTest extends TestLogger {
                verify(operatorSnapshotResult1).cancel();
                verify(operatorSnapshotResult2).cancel();
                verify(operatorSnapshotResult3).cancel();
-
-               verify(streamStateHandle1).discardState();
-               verify(streamStateHandle2).discardState();
-               verify(streamStateHandle3).discardState();
        }
 
        /**
@@ -481,7 +453,7 @@ public class StreamTaskTest extends TestLogger {
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
                streamTask.setEnvironment(mockEnvironment);
 
-               StreamOperator<?> streamOperator = mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+               StreamOperator<?> streamOperator = mock(StreamOperator.class);
 
                KeyedStateHandle managedKeyedStateHandle = 
mock(KeyedStateHandle.class);
                KeyedStateHandle rawKeyedStateHandle = 
mock(KeyedStateHandle.class);
@@ -581,7 +553,6 @@ public class StreamTaskTest extends TestLogger {
 
                whenNew(OperatorSubtaskState.class).
                        withArguments(
-                               any(StreamStateHandle.class),
                                anyCollectionOf(OperatorStateHandle.class),
                                anyCollectionOf(OperatorStateHandle.class),
                                anyCollectionOf(KeyedStateHandle.class),
@@ -593,11 +564,10 @@ public class StreamTaskTest extends TestLogger {
                                completeSubtask.await();
                                Object[] arguments = invocation.getArguments();
                                return new OperatorSubtaskState(
-                                       (StreamStateHandle) arguments[0],
+                                       (OperatorStateHandle) arguments[0],
                                        (OperatorStateHandle) arguments[1],
-                                       (OperatorStateHandle) arguments[2],
-                                       (KeyedStateHandle) arguments[3],
-                                       (KeyedStateHandle) arguments[4]
+                                       (KeyedStateHandle) arguments[2],
+                                       (KeyedStateHandle) arguments[3]
                                );
                        }
                });
@@ -606,7 +576,7 @@ public class StreamTaskTest extends TestLogger {
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
                streamTask.setEnvironment(mockEnvironment);
 
-               final StreamOperator<?> streamOperator = 
mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+               final StreamOperator<?> streamOperator = 
mock(StreamOperator.class);
                final OperatorID operatorID = new OperatorID();
                when(streamOperator.getOperatorID()).thenReturn(operatorID);
 
@@ -717,7 +687,7 @@ public class StreamTaskTest extends TestLogger {
 
                // mock the operators
                StreamOperator<?> statelessOperator =
-                               mock(StreamOperator.class, 
withSettings().extraInterfaces(StreamCheckpointedOperator.class));
+                               mock(StreamOperator.class);
 
                final OperatorID operatorID = new OperatorID();
                when(statelessOperator.getOperatorID()).thenReturn(operatorID);

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 720346a..9156f34 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -25,11 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import 
org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.migration.util.MigrationInstantiationUtil;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -41,12 +36,10 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -54,7 +47,6 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
@@ -72,7 +64,6 @@ import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.FileInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -308,36 +299,6 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                setupCalled = true;
        }
 
-       public void initializeStateFromLegacyCheckpoint(String 
checkpointFilename) throws Exception {
-
-               FileInputStream fin = new FileInputStream(checkpointFilename);
-               StreamTaskState state = 
MigrationInstantiationUtil.deserializeObject(fin, 
ClassLoader.getSystemClassLoader());
-               fin.close();
-
-               if (!setupCalled) {
-                       setup();
-               }
-
-               StreamStateHandle stateHandle = 
SavepointV0Serializer.convertOperatorAndFunctionState(state);
-
-               List<KeyedStateHandle> keyGroupStatesList = new ArrayList<>();
-               if (state.getKvStates() != null) {
-                       KeyGroupsStateHandle keyedStateHandle = 
SavepointV0Serializer.convertKeyedBackendState(
-                                       state.getKvStates(),
-                                       
environment.getTaskInfo().getIndexOfThisSubtask(),
-                                       0);
-                       keyGroupStatesList.add(keyedStateHandle);
-               }
-
-               // finally calling the initializeState() with the legacy 
operatorStateHandles
-               initializeState(new OperatorStateHandles(0,
-                               stateHandle,
-                               keyGroupStatesList,
-                               Collections.<KeyedStateHandle>emptyList(),
-                               Collections.<OperatorStateHandle>emptyList(),
-                               Collections.<OperatorStateHandle>emptyList()));
-       }
-
        /**
         * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorSubtaskState)}.
         * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
@@ -397,7 +358,6 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                                        numSubtasks).get(subtaskIndex);
 
                        OperatorSubtaskState massagedOperatorStateHandles = new 
OperatorSubtaskState(
-                               operatorStateHandles.getLegacyOperatorState(),
                                
nullToEmptyCollection(localManagedOperatorState),
                                nullToEmptyCollection(localRawOperatorState),
                                
nullToEmptyCollection(localManagedKeyGroupState),
@@ -473,7 +433,6 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
 
                return new OperatorStateHandles(
                        0,
-                       null,
                        mergedManagedKeyedState,
                        mergedRawKeyedState,
                        mergedManagedOperatorState,
@@ -497,8 +456,6 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         */
        public OperatorStateHandles snapshot(long checkpointId, long timestamp) 
throws Exception {
 
-               CheckpointStreamFactory streamFactory = 
stateBackend.createStreamFactory(new JobID(), "test_op");
-
                OperatorSnapshotResult operatorStateResult = 
operator.snapshotState(
                        checkpointId,
                        timestamp,
@@ -510,21 +467,8 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                OperatorStateHandle opManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
                OperatorStateHandle opRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
-               // also snapshot legacy state, if any
-               StreamStateHandle legacyStateHandle = null;
-
-               if (operator instanceof StreamCheckpointedOperator) {
-
-                       final 
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-                                       
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-
-                               ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
-                               legacyStateHandle = 
outStream.closeAndGetHandle();
-               }
-
                return new OperatorStateHandles(
                        0,
-                       legacyStateHandle,
                        keyedManaged != null ? 
Collections.singletonList(keyedManaged) : null,
                        keyedRaw != null ? Collections.singletonList(keyedRaw) 
: null,
                        opManaged != null ? 
Collections.singletonList(opManaged) : null,
@@ -532,24 +476,6 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
        }
 
        /**
-        * Calls {@link 
StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if
-        * the operator implements this interface.
-        */
-       @Deprecated
-       public StreamStateHandle snapshotLegacy(long checkpointId, long 
timestamp) throws Exception {
-
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream = 
stateBackend.createStreamFactory(
-                               new JobID(),
-                               
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-               if (operator instanceof StreamCheckpointedOperator) {
-                       ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
-                       return outStream.closeAndGetHandle();
-               } else {
-                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
-               }
-       }
-
-       /**
         * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
 ()}.
         */
        public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
@@ -557,22 +483,6 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
        }
 
        /**
-        * Calls {@link 
StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
-        * the operator implements this interface.
-        */
-       @Deprecated
-       @SuppressWarnings("deprecation")
-       public void restore(StreamStateHandle snapshot) throws Exception {
-               if (operator instanceof StreamCheckpointedOperator) {
-                       try (FSDataInputStream in = snapshot.openInputStream()) 
{
-                               ((StreamCheckpointedOperator) 
operator).restoreState(in);
-                       }
-               } else {
-                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
-               }
-       }
-
-       /**
         * Calls close and dispose on the operator.
         */
        public void close() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 0d42d9f..c2ec63a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -23,33 +23,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.util.Migration;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.anyInt;
@@ -142,61 +132,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                }
        }
 
-       /**
-        *
-        */
-       @Override
-       public StreamStateHandle snapshotLegacy(long checkpointId, long 
timestamp) throws Exception {
-               // simply use an in-memory handle
-               MemoryStateBackend backend = new MemoryStateBackend();
-
-               CheckpointStreamFactory streamFactory = 
backend.createStreamFactory(new JobID(), "test_op");
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-                               
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
-
-               if (operator instanceof StreamCheckpointedOperator) {
-                       ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
-               }
-
-               if (keyedStateBackend != null) {
-                       RunnableFuture<KeyedStateHandle> keyedSnapshotRunnable 
= keyedStateBackend.snapshot(
-                                       checkpointId,
-                                       timestamp,
-                                       streamFactory,
-                                       CheckpointOptions.forFullCheckpoint());
-                       if (!keyedSnapshotRunnable.isDone()) {
-                               Thread runner = new 
Thread(keyedSnapshotRunnable);
-                               runner.start();
-                       }
-                       outStream.write(1);
-                       ObjectOutputStream oos = new 
ObjectOutputStream(outStream);
-                       oos.writeObject(keyedSnapshotRunnable.get());
-                       oos.flush();
-               } else {
-                       outStream.write(0);
-               }
-               return outStream.closeAndGetHandle();
-       }
-
-       /**
-        *
-        */
-       @Override
-       public void restore(StreamStateHandle snapshot) throws Exception {
-               try (FSDataInputStream inStream = snapshot.openInputStream()) {
-
-                       if (operator instanceof StreamCheckpointedOperator) {
-                               ((StreamCheckpointedOperator) 
operator).restoreState(inStream);
-                       }
-
-                       byte keyedStatePresent = (byte) inStream.read();
-                       if (keyedStatePresent == 1) {
-                               ObjectInputStream ois = new 
ObjectInputStream(inStream);
-                               this.restoredKeyedState = 
Collections.singletonList((KeyedStateHandle) ois.readObject());
-                       }
-               }
-       }
-
        private static boolean hasMigrationHandles(Collection<KeyedStateHandle> 
allKeyGroupsHandles) {
                for (KeyedStateHandle handle : allKeyGroupsHandles) {
                        if (handle instanceof Migration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 7e32723..33f32e9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
 import java.io.DataInputStream;
@@ -53,7 +52,8 @@ public class OperatorSnapshotUtil {
 
                        dos.writeInt(state.getOperatorChainIndex());
 
-                       
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
+                       // still required for compatibility
+                       SavepointV1Serializer.serializeStreamStateHandle(null, 
dos);
 
                        Collection<OperatorStateHandle> rawOperatorState = 
state.getRawOperatorState();
                        if (rawOperatorState != null) {
@@ -108,7 +108,8 @@ public class OperatorSnapshotUtil {
                try (DataInputStream dis = new DataInputStream(in)) {
                        int index = dis.readInt();
 
-                       StreamStateHandle legacyState = 
SavepointV1Serializer.deserializeStreamStateHandle(dis);
+                       // still required for compatibility to consume the 
bytes.
+                       SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
                        List<OperatorStateHandle> rawOperatorState = null;
                        int numRawOperatorStates = dis.readInt();
@@ -154,7 +155,12 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       return new OperatorStateHandles(index, legacyState, 
managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+                       return new OperatorStateHandles(
+                               index,
+                               managedKeyedState,
+                               rawKeyedState,
+                               managedOperatorState,
+                               rawOperatorState);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
index f723b34..1c95a04 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
@@ -29,22 +29,16 @@ public class MigrationTestUtil {
        /**
         * Restore from a snapshot taken with an older Flink version.
         *
-        * @param testHarness the test harness to restore the snapshot to.
-        * @param snapshotPath the absolute path to the snapshot.
+        * @param testHarness          the test harness to restore the snapshot 
to.
+        * @param snapshotPath         the absolute path to the snapshot.
         * @param snapshotFlinkVersion the Flink version of the snapshot.
-        *
         * @throws Exception
         */
        public static void restoreFromSnapshot(
-                       AbstractStreamOperatorTestHarness<?> testHarness,
-                       String snapshotPath,
-                       MigrationVersion snapshotFlinkVersion) throws Exception 
{
+               AbstractStreamOperatorTestHarness<?> testHarness,
+               String snapshotPath,
+               MigrationVersion snapshotFlinkVersion) throws Exception {
 
-               if (snapshotFlinkVersion == MigrationVersion.v1_1) {
-                       // Flink 1.1 snapshots should be read using the legacy 
restore method
-                       
testHarness.initializeStateFromLegacyCheckpoint(snapshotPath);
-               } else {
-                       
testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath));
-               }
+               
testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
index 104400f..35a56d7 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -25,15 +25,15 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import 
org.apache.flink.streaming.api.windowing.assigners.{SlidingAlignedProcessingTimeWindows,
 SlidingEventTimeWindows, TumblingAlignedProcessingTimeWindows}
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import 
org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator,
 AggregatingProcessingTimeWindowOperator, WindowOperator}
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 /**
   * These tests verify that the api calls on [[WindowedStream]] that use the 
"time" shortcut
@@ -85,59 +85,6 @@ class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
   }
 
-  /**
-    * These tests ensure that the fast aligned time windows operator is used 
if the
-    * conditions are right.
-    */
-  @Test
-  def testReduceAlignedTimeWindows(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-    
-    val window1 = source
-      .keyBy(0)
-      .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), 
Time.milliseconds(100)))
-      .reduce(new DummyReducer())
-
-    val transform1 = window1.javaStream.getTransformation
-        .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-    
-    val operator1 = transform1.getOperator
-
-    
assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, 
_]])
-  }
-
-  /**
-    * These tests ensure that the fast aligned time windows operator is used 
if the
-    * conditions are right.
-    */
-  @Test
-  def testApplyAlignedTimeWindows(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val window1 = source
-      .keyBy(0)
-      .window(TumblingAlignedProcessingTimeWindows.of(Time.minutes(1)))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
-        def apply(
-                   key: Tuple,
-                   window: TimeWindow,
-                   values: Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
-
-    val transform1 = window1.javaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    
assertTrue(operator1.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, 
_, _]])
-  }
-
   @Test
   def testReduceEventTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index cad6693..99fb6ef 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -975,7 +974,7 @@ public class RescalingITCase extends TestLogger {
                }
        }
 
-       private static class PartitionedStateSource extends StateSourceBase 
implements CheckpointedFunction, CheckpointedRestoring<Integer> {
+       private static class PartitionedStateSource extends StateSourceBase 
implements CheckpointedFunction {
 
                private static final long serialVersionUID = 
-359715965103593462L;
                private static final int NUM_PARTITIONS = 7;
@@ -1030,10 +1029,5 @@ public class RescalingITCase extends TestLogger {
                                
checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter;
                        }
                }
-
-               @Override
-               public void restoreState(Integer state) throws Exception {
-                       counterPartitions.add(state);
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index cc23545..1b7dafa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -53,7 +53,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
@@ -351,10 +351,6 @@ public class SavepointITCase extends TestLogger {
                                        OperatorSubtaskState subtaskState = 
operatorState.getState(tdd.getSubtaskIndex());
 
                                        assertNotNull(subtaskState);
-
-                                       errMsg = "Initial operator state 
mismatch.";
-                                       assertEquals(errMsg, 
subtaskState.getLegacyOperatorState(),
-                                               
tdd.getTaskStateHandles().getSubtaskStateByOperatorID(operatorState.getOperatorID()).getLegacyOperatorState());
                                }
                        }
 
@@ -377,17 +373,18 @@ public class SavepointITCase extends TestLogger {
                        assertTrue(errMsg, resp.getClass() == 
getDisposeSavepointSuccess().getClass());
 
                        // - Verification START 
-------------------------------------------
-
                        // The checkpoint files
                        List<File> checkpointFiles = new ArrayList<>();
 
                        for (OperatorState stateForTaskGroup : 
savepoint.getOperatorStates()) {
                                for (OperatorSubtaskState subtaskState : 
stateForTaskGroup.getStates()) {
-                                       StreamStateHandle streamTaskState = 
subtaskState.getLegacyOperatorState();
+                                       Collection<OperatorStateHandle> 
streamTaskState = subtaskState.getManagedOperatorState();
 
-                                       if (streamTaskState != null) {
-                                               FileStateHandle fileStateHandle 
= (FileStateHandle) streamTaskState;
-                                               checkpointFiles.add(new 
File(fileStateHandle.getFilePath().toUri()));
+                                       if (streamTaskState != null && 
!streamTaskState.isEmpty()) {
+                                               for (OperatorStateHandle osh : 
streamTaskState) {
+                                                       FileStateHandle 
fileStateHandle = (FileStateHandle) osh.getDelegateStateHandle();
+                                                       checkpointFiles.add(new 
File(fileStateHandle.getFilePath().toUri()));
+                                               }
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 21be7ba..eccc7e9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +40,7 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -64,6 +66,11 @@ import static junit.framework.Assert.fail;
  */
 public class SavepointMigrationTestBase extends TestBaseUtils {
 
+       @BeforeClass
+       public static void before() {
+               SavepointSerializers.setFailWhenLegacyStateDetected(false);
+       }
+
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
 

Reply via email to