http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
deleted file mode 100644
index 16f3769..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
-import 
org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
-import org.apache.flink.migration.runtime.state.KvStateSnapshot;
-import org.apache.flink.migration.runtime.state.memory.MemValueState;
-import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
-import org.apache.flink.migration.util.MigrationInstantiationUtil;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("deprecation")
-public class MigrationV0ToV1Test {
-
-       @Rule
-       public TemporaryFolder tmp = new TemporaryFolder();
-
-       /**
-        * Simple test of savepoint methods.
-        */
-       @Test
-       public void testSavepointMigrationV0ToV1() throws Exception {
-
-               String target = tmp.getRoot().getAbsolutePath();
-
-               assertEquals(0, tmp.getRoot().listFiles().length);
-
-               long checkpointId = 
ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
-               int numTaskStates = 4;
-               int numSubtaskStates = 16;
-
-               
Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> expected =
-                               createTaskStatesOld(numTaskStates, 
numSubtaskStates);
-
-               SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
-               assertEquals(SavepointV0.VERSION, savepoint.getVersion());
-               assertEquals(checkpointId, savepoint.getCheckpointId());
-               assertEquals(expected, savepoint.getOldTaskStates());
-
-               assertFalse(savepoint.getOldTaskStates().isEmpty());
-
-               Exception latestException = null;
-               Path path = null;
-               FSDataOutputStream fdos = null;
-
-               FileSystem fs = null;
-
-               try {
-
-                       // Try to create a FS output stream
-                       for (int attempt = 0; attempt < 10; attempt++) {
-                               path = new Path(target, 
FileUtils.getRandomFilename("savepoint-"));
-
-                               if (fs == null) {
-                                       fs = FileSystem.get(path.toUri());
-                               }
-
-                               try {
-                                       fdos = fs.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
-                                       break;
-                               } catch (Exception e) {
-                                       latestException = e;
-                               }
-                       }
-
-                       if (fdos == null) {
-                               throw new IOException("Failed to create file 
output stream at " + path, latestException);
-                       }
-
-                       try (DataOutputStream dos = new DataOutputStream(fdos)) 
{
-                               dos.writeInt(SavepointStore.MAGIC_NUMBER);
-                               dos.writeInt(savepoint.getVersion());
-                               
SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos);
-                       }
-
-                       ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
-
-                       Savepoint sp = 
SavepointStore.loadSavepoint(path.toString(), cl);
-                       int t = 0;
-                       for (TaskState taskState : sp.getTaskStates()) {
-                               for (int p = 0; p < taskState.getParallelism(); 
++p) {
-                                       SubtaskState subtaskState = 
taskState.getState(p);
-                                       ChainedStateHandle<StreamStateHandle> 
legacyOperatorState = subtaskState.getLegacyOperatorState();
-                                       for (int c = 0; c < 
legacyOperatorState.getLength(); ++c) {
-                                               StreamStateHandle stateHandle = 
legacyOperatorState.get(c);
-                                               try (InputStream is = 
stateHandle.openInputStream()) {
-                                                       Tuple4<Integer, 
Integer, Integer, Integer> expTestState = new Tuple4<>(0, t, p, c);
-                                                       Tuple4<Integer, 
Integer, Integer, Integer> actTestState;
-                                                       //check function state
-                                                       if (p % 4 != 0) {
-                                                               assertEquals(1, 
is.read());
-                                                               actTestState = 
InstantiationUtil.deserializeObject(is, cl);
-                                                               
assertEquals(expTestState, actTestState);
-                                                       } else {
-                                                               assertEquals(0, 
is.read());
-                                                       }
-
-                                                       //check operator state
-                                                       expTestState.f0 = 1;
-                                                       actTestState = 
InstantiationUtil.deserializeObject(is, cl);
-                                                       
assertEquals(expTestState, actTestState);
-                                               }
-                                       }
-
-                                       //check keyed state
-                                       KeyedStateHandle keyedStateHandle = 
subtaskState.getManagedKeyedState();
-
-                                       if (t % 3 != 0) {
-
-                                               assertTrue(keyedStateHandle 
instanceof KeyGroupsStateHandle);
-
-                                               KeyGroupsStateHandle 
keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
-
-                                               assertEquals(1, 
keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-                                               assertEquals(p, 
keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
-
-                                               ByteStreamStateHandle 
stateHandle =
-                                                               
(ByteStreamStateHandle) keyGroupsStateHandle.getDelegateStateHandle();
-                                               HashMap<String, 
KvStateSnapshot<?, ?, ?, ?>> testKeyedState =
-                                                               
MigrationInstantiationUtil.deserializeObject(stateHandle.getData(), cl);
-
-                                               assertEquals(2, 
testKeyedState.size());
-                                               for (KvStateSnapshot<?, ?, ?, 
?> snapshot : testKeyedState.values()) {
-                                                       
MemValueState.Snapshot<?, ?, ?> castedSnapshot = (MemValueState.Snapshot<?, ?, 
?>) snapshot;
-                                                       byte[] data = 
castedSnapshot.getData();
-                                                       assertEquals(t, 
data[0]);
-                                                       assertEquals(p, 
data[1]);
-                                               }
-                                       } else {
-                                               assertEquals(null, 
keyedStateHandle);
-                                       }
-                               }
-
-                               ++t;
-                       }
-
-                       savepoint.dispose();
-
-               } finally {
-                       // Dispose
-                       SavepointStore.removeSavepointFile(path.toString());
-               }
-       }
-
-       private static 
Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> 
createTaskStatesOld(
-                       int numTaskStates, int numSubtaskStates) throws 
Exception {
-
-               List<org.apache.flink.migration.runtime.checkpoint.TaskState> 
taskStates = new ArrayList<>(numTaskStates);
-
-               for (int i = 0; i < numTaskStates; i++) {
-                       org.apache.flink.migration.runtime.checkpoint.TaskState 
taskState =
-                                       new 
org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), 
numSubtaskStates);
-                       for (int j = 0; j < numSubtaskStates; j++) {
-
-                               StreamTaskState[] streamTaskStates = new 
StreamTaskState[2];
-
-                               for (int k = 0; k < streamTaskStates.length; 
k++) {
-                                       StreamTaskState state = new 
StreamTaskState();
-                                       Tuple4<Integer, Integer, Integer, 
Integer> testState = new Tuple4<>(0, i, j, k);
-                                       if (j % 4 != 0) {
-                                               state.setFunctionState(new 
SerializedStateHandle<Serializable>(testState));
-                                       }
-                                       testState = new Tuple4<>(1, i, j, k);
-                                       state.setOperatorState(new 
SerializedStateHandle<>(testState));
-
-                                       if ((0 == k) && (i % 3 != 0)) {
-                                               HashMap<String, 
KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
-                                               for (int l = 0; l < 2; ++l) {
-                                                       String name = "keyed-" 
+ l;
-                                                       KvStateSnapshot<?, ?, 
?, ?> testKeyedSnapshot =
-                                                                       new 
MemValueState.Snapshot<>(
-                                                                               
        IntSerializer.INSTANCE,
-                                                                               
        VoidNamespaceSerializer.INSTANCE,
-                                                                               
        IntSerializer.INSTANCE,
-                                                                               
        new ValueStateDescriptor<>(name, Integer.class, 0),
-                                                                               
        new byte[]{(byte) i, (byte) j});
-                                                       
testKeyedState.put(name, testKeyedSnapshot);
-                                               }
-                                               
state.setKvStates(testKeyedState);
-                                       }
-                                       streamTaskStates[k] = state;
-                               }
-
-                               StreamTaskStateList streamTaskStateList = new 
StreamTaskStateList(streamTaskStates);
-                               org.apache.flink.migration.util.SerializedValue<
-                                               
org.apache.flink.migration.runtime.state.StateHandle<?>> handle =
-                                               new 
org.apache.flink.migration.util.SerializedValue<
-                                                               
org.apache.flink.migration.runtime.state.StateHandle<?>>(streamTaskStateList);
-
-                               taskState.putState(j, new 
org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle, 0, 0));
-                       }
-
-                       taskStates.add(taskState);
-               }
-
-               return taskStates;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 933c7a0..173730a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
@@ -98,6 +100,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
@@ -560,7 +563,8 @@ public class JobManagerHARecoveryTest extends TestLogger {
                        TaskStateSnapshot taskStateHandles) throws Exception {
                        int subtaskIndex = getIndexInSubtaskGroup();
                        if (subtaskIndex < recoveredStates.length) {
-                               try (FSDataInputStream in = 
taskStateHandles.getSubtaskStateMappings().iterator().next().getValue().getLegacyOperatorState().openInputStream())
 {
+                               OperatorStateHandle operatorStateHandle = 
extractSingletonOperatorState(taskStateHandles);
+                               try (FSDataInputStream in = 
operatorStateHandle.openInputStream()) {
                                        recoveredStates[subtaskIndex] = 
InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
                                }
                        }
@@ -572,11 +576,21 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                        String.valueOf(UUID.randomUUID()),
                                        
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
 
+                       Map<String, OperatorStateHandle.StateMetaInfo> 
stateNameToPartitionOffsets = new HashMap<>(1);
+                       stateNameToPartitionOffsets.put(
+                               "test-state",
+                               new OperatorStateHandle.StateMetaInfo(new 
long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+
+                       OperatorStateHandle operatorStateHandle = new 
OperatorStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle);
+
                        TaskStateSnapshot checkpointStateHandles = new 
TaskStateSnapshot();
                        checkpointStateHandles.putSubtaskStateByOperatorID(
                                
OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
-                               new OperatorSubtaskState(byteStreamStateHandle)
-                       );
+                               new OperatorSubtaskState(
+                                       
Collections.singletonList(operatorStateHandle),
+                                       Collections.emptyList(),
+                                       Collections.emptyList(),
+                                       Collections.emptyList()));
 
                        getEnvironment().acknowledgeCheckpoint(
                                        checkpointMetaData.getCheckpointId(),
@@ -614,5 +628,17 @@ public class JobManagerHARecoveryTest extends TestLogger {
                public static long[] getRecoveredStates() {
                        return recoveredStates;
                }
+
+               private static OperatorStateHandle 
extractSingletonOperatorState(TaskStateSnapshot taskStateHandles) {
+                       Set<Map.Entry<OperatorID, OperatorSubtaskState>> 
subtaskStateMappings = taskStateHandles.getSubtaskStateMappings();
+                       Preconditions.checkNotNull(subtaskStateMappings);
+                       Preconditions.checkState(subtaskStateMappings.size()  
== 1);
+                       OperatorSubtaskState subtaskState = 
subtaskStateMappings.iterator().next().getValue();
+                       Collection<OperatorStateHandle> managedOperatorState =
+                               
Preconditions.checkNotNull(subtaskState).getManagedOperatorState();
+                       Preconditions.checkNotNull(managedOperatorState);
+                       Preconditions.checkState(managedOperatorState.size()  
== 1);
+                       return managedOperatorState.iterator().next();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index d022cdc..b36ac86 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -75,7 +75,6 @@ public class CheckpointMessagesTest {
                        checkpointStateHandles.putSubtaskStateByOperatorID(
                                new OperatorID(),
                                new OperatorSubtaskState(
-                                       
CheckpointCoordinatorTest.generateStreamStateHandle(new MyHandle()),
                                        
CheckpointCoordinatorTest.generatePartitionableStateHandle(new JobVertexID(), 
0, 2, 8, false),
                                        null,
                                        
CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, 
Collections.singletonList(new MyHandle())),

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
deleted file mode 100644
index dd6148c..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.api.graph;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphHasher;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-
-import org.apache.flink.shaded.guava18.com.google.common.hash.HashFunction;
-import org.apache.flink.shaded.guava18.com.google.common.hash.Hasher;
-import org.apache.flink.shaded.guava18.com.google.common.hash.Hashing;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-
-import static org.apache.flink.util.StringUtils.byteToHexString;
-
-/**
- * StreamGraphHasher from Flink 1.1. This contains duplicated code to ensure 
that the algorithm does not change with
- * future Flink versions.
- *
- * <p>DO NOT MODIFY THIS CLASS
- */
-public class StreamGraphHasherV1 implements StreamGraphHasher {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraphHasherV1.class);
-
-       @Override
-       public Map<Integer, byte[]> 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
-               // The hash function used to generate the hash
-               final HashFunction hashFunction = Hashing.murmur3_128(0);
-               final Map<Integer, byte[]> hashes = new HashMap<>();
-
-               Set<Integer> visited = new HashSet<>();
-               Queue<StreamNode> remaining = new ArrayDeque<>();
-
-               // We need to make the source order deterministic. The source 
IDs are
-               // not returned in the same order, which means that submitting 
the same
-               // program twice might result in different traversal, which 
breaks the
-               // deterministic hash assignment.
-               List<Integer> sources = new ArrayList<>();
-               for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
-                       sources.add(sourceNodeId);
-               }
-               Collections.sort(sources);
-
-               //
-               // Traverse the graph in a breadth-first manner. Keep in mind 
that
-               // the graph is not a tree and multiple paths to nodes can 
exist.
-               //
-
-               // Start with source nodes
-               for (Integer sourceNodeId : sources) {
-                       remaining.add(streamGraph.getStreamNode(sourceNodeId));
-                       visited.add(sourceNodeId);
-               }
-
-               StreamNode currentNode;
-               while ((currentNode = remaining.poll()) != null) {
-                       // Generate the hash code. Because multiple path exist 
to each
-                       // node, we might not have all required inputs 
available to
-                       // generate the hash code.
-                       if (generateNodeHash(currentNode, hashFunction, hashes, 
streamGraph.isChainingEnabled())) {
-                               // Add the child nodes
-                               for (StreamEdge outEdge : 
currentNode.getOutEdges()) {
-                                       StreamNode child = 
outEdge.getTargetVertex();
-
-                                       if (!visited.contains(child.getId())) {
-                                               remaining.add(child);
-                                               visited.add(child.getId());
-                                       }
-                               }
-                       } else {
-                               // We will revisit this later.
-                               visited.remove(currentNode.getId());
-                       }
-               }
-
-               return hashes;
-       }
-
-       /**
-        * Generates a hash for the node and returns whether the operation was
-        * successful.
-        *
-        * @param node         The node to generate the hash for
-        * @param hashFunction The hash function to use
-        * @param hashes       The current state of generated hashes
-        * @return <code>true</code> if the node hash has been generated.
-        * <code>false</code>, otherwise. If the operation is not successful, 
the
-        * hash needs be generated at a later point when all input is available.
-        * @throws IllegalStateException If node has user-specified hash and is
-        *                               intermediate node of a chain
-        */
-       private boolean generateNodeHash(
-                       StreamNode node,
-                       HashFunction hashFunction,
-                       Map<Integer, byte[]> hashes,
-                       boolean isChainingEnabled) {
-
-               // Check for user-specified ID
-               String userSpecifiedHash = node.getTransformationUID();
-
-               if (userSpecifiedHash == null) {
-                       // Check that all input nodes have their hashes computed
-                       for (StreamEdge inEdge : node.getInEdges()) {
-                               // If the input node has not been visited yet, 
the current
-                               // node will be visited again at a later point 
when all input
-                               // nodes have been visited and their hashes set.
-                               if (!hashes.containsKey(inEdge.getSourceId())) {
-                                       return false;
-                               }
-                       }
-
-                       Hasher hasher = hashFunction.newHasher();
-                       byte[] hash = generateDeterministicHash(node, hasher, 
hashes, isChainingEnabled);
-
-                       if (hashes.put(node.getId(), hash) != null) {
-                               // Sanity check
-                               throw new IllegalStateException("Unexpected 
state. Tried to add node hash " +
-                                               "twice. This is probably a bug 
in the JobGraph generator.");
-                       }
-
-                       return true;
-               } else {
-                       Hasher hasher = hashFunction.newHasher();
-                       byte[] hash = generateUserSpecifiedHash(node, hasher);
-
-                       for (byte[] previousHash : hashes.values()) {
-                               if (Arrays.equals(previousHash, hash)) {
-                                       throw new 
IllegalArgumentException("Hash collision on user-specified ID. " +
-                                                       "Most likely cause is a 
non-unique ID. Please check that all IDs " +
-                                                       "specified via 
`uid(String)` are unique.");
-                               }
-                       }
-
-                       if (hashes.put(node.getId(), hash) != null) {
-                               // Sanity check
-                               throw new IllegalStateException("Unexpected 
state. Tried to add node hash " +
-                                               "twice. This is probably a bug 
in the JobGraph generator.");
-                       }
-
-                       return true;
-               }
-       }
-
-       /**
-        * Generates a hash from a user-specified ID.
-        */
-       private byte[] generateUserSpecifiedHash(StreamNode node, Hasher 
hasher) {
-               hasher.putString(node.getTransformationUID(), 
Charset.forName("UTF-8"));
-
-               return hasher.hash().asBytes();
-       }
-
-       /**
-        * Generates a deterministic hash from node-local properties and input 
and
-        * output edges.
-        */
-       private byte[] generateDeterministicHash(
-                       StreamNode node,
-                       Hasher hasher,
-                       Map<Integer, byte[]> hashes,
-                       boolean isChainingEnabled) {
-
-               // Include stream node to hash. We use the current size of the 
computed
-               // hashes as the ID. We cannot use the node's ID, because it is
-               // assigned from a static counter. This will result in two 
identical
-               // programs having different hashes.
-               generateNodeLocalHash(node, hasher, hashes.size());
-
-               // Include chained nodes to hash
-               for (StreamEdge outEdge : node.getOutEdges()) {
-                       if (isChainable(outEdge, isChainingEnabled)) {
-                               StreamNode chainedNode = 
outEdge.getTargetVertex();
-
-                               // Use the hash size again, because the nodes 
are chained to
-                               // this node. This does not add a hash for the 
chained nodes.
-                               generateNodeLocalHash(chainedNode, hasher, 
hashes.size());
-                       }
-               }
-
-               byte[] hash = hasher.hash().asBytes();
-
-               // Make sure that all input nodes have their hash set before 
entering
-               // this loop (calling this method).
-               for (StreamEdge inEdge : node.getInEdges()) {
-                       byte[] otherHash = hashes.get(inEdge.getSourceId());
-
-                       // Sanity check
-                       if (otherHash == null) {
-                               throw new IllegalStateException("Missing hash 
for input node "
-                                               + inEdge.getSourceVertex() + ". 
Cannot generate hash for "
-                                               + node + ".");
-                       }
-
-                       for (int j = 0; j < hash.length; j++) {
-                               hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
-                       }
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       String udfClassName = "";
-                       if (node.getOperator() instanceof 
AbstractUdfStreamOperator) {
-                               udfClassName = ((AbstractUdfStreamOperator<?, 
?>) node.getOperator())
-                                               
.getUserFunction().getClass().getName();
-                       }
-
-                       LOG.debug("Generated hash '" + byteToHexString(hash) + 
"' for node " +
-                                       "'" + node.toString() + "' {id: " + 
node.getId() + ", " +
-                                       "parallelism: " + node.getParallelism() 
+ ", " +
-                                       "user function: " + udfClassName + "}");
-               }
-
-               return hash;
-       }
-
-       private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) 
{
-               StreamNode upStreamVertex = edge.getSourceVertex();
-               StreamNode downStreamVertex = edge.getTargetVertex();
-
-               StreamOperator<?> headOperator = upStreamVertex.getOperator();
-               StreamOperator<?> outOperator = downStreamVertex.getOperator();
-
-               return downStreamVertex.getInEdges().size() == 1
-                               && outOperator != null
-                               && headOperator != null
-                               && 
upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
-                               && outOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS
-                               && (headOperator.getChainingStrategy() == 
ChainingStrategy.HEAD ||
-                               headOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS)
-                               && (edge.getPartitioner() instanceof 
ForwardPartitioner)
-                               && upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
-                               && isChainingEnabled;
-       }
-
-       private void generateNodeLocalHash(StreamNode node, Hasher hasher, int 
id) {
-               hasher.putInt(id);
-
-               hasher.putInt(node.getParallelism());
-
-               if (node.getOperator() instanceof AbstractUdfStreamOperator) {
-                       String udfClassName = ((AbstractUdfStreamOperator<?, 
?>) node.getOperator())
-                                       .getUserFunction().getClass().getName();
-
-                       hasher.putString(udfClassName, 
Charset.forName("UTF-8"));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
deleted file mode 100644
index b1471b2..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.streamrecord;
-
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Legacy multiplexing {@link TypeSerializer} for stream records, watermarks 
and other stream
- * elements.
- */
-public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<StreamElement> {
-
-
-       private static final long serialVersionUID = 1L;
-
-       private static final int TAG_REC_WITH_TIMESTAMP = 0;
-       private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
-       private static final int TAG_WATERMARK = 2;
-
-
-       private final TypeSerializer<T> typeSerializer;
-
-       public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) 
{
-               if (serializer instanceof MultiplexingStreamRecordSerializer || 
serializer instanceof StreamRecordSerializer) {
-                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-               }
-               this.typeSerializer = requireNonNull(serializer);
-       }
-
-       public TypeSerializer<T> getContainedTypeSerializer() {
-               return this.typeSerializer;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public MultiplexingStreamRecordSerializer<T> duplicate() {
-               TypeSerializer<T> copy = typeSerializer.duplicate();
-               return (copy == typeSerializer) ? this : new 
MultiplexingStreamRecordSerializer<T>(copy);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public StreamRecord<T> createInstance() {
-               return new StreamRecord<T>(typeSerializer.createInstance());
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public StreamElement copy(StreamElement from) {
-               // we can reuse the timestamp since Instant is immutable
-               if (from.isRecord()) {
-                       StreamRecord<T> fromRecord = from.asRecord();
-                       return 
fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
-               }
-               else if (from.isWatermark()) {
-                       // is immutable
-                       return from;
-               }
-               else {
-                       throw new RuntimeException();
-               }
-       }
-
-       @Override
-       public StreamElement copy(StreamElement from, StreamElement reuse) {
-               if (from.isRecord() && reuse.isRecord()) {
-                       StreamRecord<T> fromRecord = from.asRecord();
-                       StreamRecord<T> reuseRecord = reuse.asRecord();
-
-                       T valueCopy = 
typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
-                       fromRecord.copyTo(valueCopy, reuseRecord);
-                       return reuse;
-               }
-               else if (from.isWatermark()) {
-                       // is immutable
-                       return from;
-               }
-               else {
-                       throw new RuntimeException("Cannot copy " + from + " -> 
" + reuse);
-               }
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               int tag = source.readByte();
-               target.write(tag);
-
-               if (tag == TAG_REC_WITH_TIMESTAMP) {
-                       // move timestamp
-                       target.writeLong(source.readLong());
-                       typeSerializer.copy(source, target);
-               }
-               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-                       typeSerializer.copy(source, target);
-               }
-               else if (tag == TAG_WATERMARK) {
-                       target.writeLong(source.readLong());
-               }
-               else {
-                       throw new IOException("Corrupt stream, found tag: " + 
tag);
-               }
-       }
-
-       @Override
-       public void serialize(StreamElement value, DataOutputView target) 
throws IOException {
-               if (value.isRecord()) {
-                       StreamRecord<T> record = value.asRecord();
-
-                       if (record.hasTimestamp()) {
-                               target.write(TAG_REC_WITH_TIMESTAMP);
-                               target.writeLong(record.getTimestamp());
-                       } else {
-                               target.write(TAG_REC_WITHOUT_TIMESTAMP);
-                       }
-                       typeSerializer.serialize(record.getValue(), target);
-               }
-               else if (value.isWatermark()) {
-                       target.write(TAG_WATERMARK);
-                       target.writeLong(value.asWatermark().getTimestamp());
-               }
-               else {
-                       throw new RuntimeException();
-               }
-       }
-
-       @Override
-       public StreamElement deserialize(DataInputView source) throws 
IOException {
-               int tag = source.readByte();
-               if (tag == TAG_REC_WITH_TIMESTAMP) {
-                       long timestamp = source.readLong();
-                       return new 
StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
-               }
-               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-                       return new 
StreamRecord<T>(typeSerializer.deserialize(source));
-               }
-               else if (tag == TAG_WATERMARK) {
-                       return new Watermark(source.readLong());
-               }
-               else {
-                       throw new IOException("Corrupt stream, found tag: " + 
tag);
-               }
-       }
-
-       @Override
-       public StreamElement deserialize(StreamElement reuse, DataInputView 
source) throws IOException {
-               int tag = source.readByte();
-               if (tag == TAG_REC_WITH_TIMESTAMP) {
-                       long timestamp = source.readLong();
-                       T value = typeSerializer.deserialize(source);
-                       StreamRecord<T> reuseRecord = reuse.asRecord();
-                       reuseRecord.replace(value, timestamp);
-                       return reuseRecord;
-               }
-               else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
-                       T value = typeSerializer.deserialize(source);
-                       StreamRecord<T> reuseRecord = reuse.asRecord();
-                       reuseRecord.replace(value);
-                       return reuseRecord;
-               }
-               else if (tag == TAG_WATERMARK) {
-                       return new Watermark(source.readLong());
-               }
-               else {
-                       throw new IOException("Corrupt stream, found tag: " + 
tag);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Serializer configuration snapshotting & compatibility
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public MultiplexingStreamRecordSerializerConfigSnapshot 
snapshotConfiguration() {
-               return new 
MultiplexingStreamRecordSerializerConfigSnapshot<>(typeSerializer);
-       }
-
-       @Override
-       public CompatibilityResult<StreamElement> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-               if (configSnapshot instanceof 
MultiplexingStreamRecordSerializerConfigSnapshot) {
-                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
previousTypeSerializerAndConfig =
-                               
((MultiplexingStreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
-
-                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       previousTypeSerializerAndConfig.f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       previousTypeSerializerAndConfig.f1,
-                                       typeSerializer);
-
-                       if (!compatResult.isRequiresMigration()) {
-                               return CompatibilityResult.compatible();
-                       } else if (compatResult.getConvertDeserializer() != 
null) {
-                               return CompatibilityResult.requiresMigration(
-                                       new 
MultiplexingStreamRecordSerializer<>(
-                                               new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-                       }
-               }
-
-               return CompatibilityResult.requiresMigration();
-       }
-
-       /**
-        * Configuration snapshot specific to the {@link 
MultiplexingStreamRecordSerializer}.
-        */
-       public static final class 
MultiplexingStreamRecordSerializerConfigSnapshot<T>
-                       extends CompositeTypeSerializerConfigSnapshot {
-
-               private static final int VERSION = 1;
-
-               /** This empty nullary constructor is required for 
deserializing the configuration. */
-               public MultiplexingStreamRecordSerializerConfigSnapshot() {}
-
-               public 
MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializer<T> 
typeSerializer) {
-                       super(typeSerializer);
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof MultiplexingStreamRecordSerializer) {
-                       MultiplexingStreamRecordSerializer<?> other = 
(MultiplexingStreamRecordSerializer<?>) obj;
-
-                       return other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof MultiplexingStreamRecordSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return typeSerializer.hashCode();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
deleted file mode 100644
index e018ba0..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.migration.streaming.runtime.streamrecord;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link StreamRecord}. This version ignores timestamps and 
only deals with
- * the element.
- *
- * <p>{@link MultiplexingStreamRecordSerializer} is a version that deals with 
timestamps and also
- * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark 
Watermarks} in the same
- * stream with {@link StreamRecord StreamRecords}.
- *
- * @see MultiplexingStreamRecordSerializer
- *
- * @param <T> The type of value in the {@link StreamRecord}
- */
-@Internal
-public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord<T>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final TypeSerializer<T> typeSerializer;
-
-       public StreamRecordSerializer(TypeSerializer<T> serializer) {
-               if (serializer instanceof StreamRecordSerializer) {
-                       throw new RuntimeException("StreamRecordSerializer 
given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-               }
-               this.typeSerializer = Preconditions.checkNotNull(serializer);
-       }
-
-       public TypeSerializer<T> getContainedTypeSerializer() {
-               return this.typeSerializer;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  General serializer and type utils
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public StreamRecordSerializer<T> duplicate() {
-               TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
-               return serializerCopy == typeSerializer ? this : new 
StreamRecordSerializer<T>(serializerCopy);
-       }
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public int getLength() {
-               return typeSerializer.getLength();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Type serialization, copying, instantiation
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public StreamRecord<T> createInstance() {
-               try {
-                       return new 
StreamRecord<T>(typeSerializer.createInstance());
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot instantiate 
StreamRecord.", e);
-               }
-       }
-
-       @Override
-       public StreamRecord<T> copy(StreamRecord<T> from) {
-               return from.copy(typeSerializer.copy(from.getValue()));
-       }
-
-       @Override
-       public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> 
reuse) {
-               from.copyTo(typeSerializer.copy(from.getValue(), 
reuse.getValue()), reuse);
-               return reuse;
-       }
-
-       @Override
-       public void serialize(StreamRecord<T> value, DataOutputView target) 
throws IOException {
-               typeSerializer.serialize(value.getValue(), target);
-       }
-
-       @Override
-       public StreamRecord<T> deserialize(DataInputView source) throws 
IOException {
-               return new StreamRecord<T>(typeSerializer.deserialize(source));
-       }
-
-       @Override
-       public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView 
source) throws IOException {
-               T element = typeSerializer.deserialize(reuse.getValue(), 
source);
-               reuse.replace(element);
-               return reuse;
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               typeSerializer.copy(source, target);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof StreamRecordSerializer) {
-                       StreamRecordSerializer<?> other = 
(StreamRecordSerializer<?>) obj;
-
-                       return other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer);
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof StreamRecordSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return typeSerializer.hashCode();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Serializer configuration snapshotting & compatibility
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public StreamRecordSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
StreamRecordSerializerConfigSnapshot<>(typeSerializer);
-       }
-
-       @Override
-       public CompatibilityResult<StreamRecord<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-               if (configSnapshot instanceof 
StreamRecordSerializerConfigSnapshot) {
-                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
previousTypeSerializerAndConfig =
-                               ((StreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
-
-                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       previousTypeSerializerAndConfig.f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       previousTypeSerializerAndConfig.f1,
-                                       typeSerializer);
-
-                       if (!compatResult.isRequiresMigration()) {
-                               return CompatibilityResult.compatible();
-                       } else if (compatResult.getConvertDeserializer() != 
null) {
-                               return CompatibilityResult.requiresMigration(
-                                       new StreamRecordSerializer<>(
-                                               new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-                       }
-               }
-
-               return CompatibilityResult.requiresMigration();
-       }
-
-       /**
-        * Configuration snapshot specific to the {@link 
StreamRecordSerializer}.
-        */
-       public static final class StreamRecordSerializerConfigSnapshot<T> 
extends CompositeTypeSerializerConfigSnapshot {
-
-               private static final int VERSION = 1;
-
-               /** This empty nullary constructor is required for 
deserializing the configuration. */
-               public StreamRecordSerializerConfigSnapshot() {}
-
-               public StreamRecordSerializerConfigSnapshot(TypeSerializer<T> 
typeSerializer) {
-                       super(typeSerializer);
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
deleted file mode 100644
index cb3c7cc..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This method must be implemented by functions that have state that needs to 
be
- * checkpointed. The functions get a call whenever a checkpoint should take 
place
- * and return a snapshot of their state, which will be checkpointed.
- *
- * <h1>Deprecation and Replacement</h1>
- * The short cut replacement for this interface is via {@link 
ListCheckpointed} and works
- * as shown in the example below. The {@code ListCheckpointed} interface 
returns a list of
- * elements (
- *
- * <p><pre>{@code
- * public class ExampleFunction<T> implements MapFunction<T, T>, 
ListCheckpointed<Integer> {
- *
- *     private int count;
- *
- *     public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
- *         return Collections.singletonList(this.count);
- *     }
- *
- *     public void restoreState(List<Integer> state) throws Exception {
- *         this.value = state.isEmpty() ? 0 : state.get(0);
- *     }
- *
- *     public T map(T value) {
- *         count++;
- *         return value;
- *     }
- * }
- * }</pre>
- *
- * @param <T> The type of the operator state.
- *
- * @deprecated Please use {@link ListCheckpointed} as illustrated above, or
- *             {@link CheckpointedFunction} for more control over the 
checkpointing process.
- */
-@Deprecated
-@PublicEvolving
-public interface Checkpointed<T extends Serializable> extends 
CheckpointedRestoring<T> {
-
-       /**
-        * Gets the current state of the function of operator. The state must 
reflect the result of all
-        * prior invocations to this function.
-        *
-        * @param checkpointId The ID of the checkpoint.
-        * @param checkpointTimestamp The timestamp of the checkpoint, as 
derived by
-        *                            System.currentTimeMillis() on the 
JobManager.
-        *
-        * @return A snapshot of the operator state.
-        *
-        * @throws Exception Thrown if the creation of the state object failed. 
This causes the
-        *                   checkpoint to fail. The system may decide to fail 
the operation (and trigger
-        *                   recovery), or to discard this checkpoint attempt 
and to continue running
-        *                   and to try again with the next checkpoint attempt.
-        */
-       T snapshotState(long checkpointId, long checkpointTimestamp) throws 
Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
deleted file mode 100644
index 5138b49..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This interface marks a function/operator as checkpointed similar to the
- * {@link Checkpointed} interface, but gives the Flink framework the option to
- * perform the checkpoint asynchronously. Note that asynchronous checkpointing 
for
- * this interface has not been implemented.
- *
- * <h1>Deprecation and Replacement</h1>
- * The shortcut replacement for this interface is via {@link ListCheckpointed} 
and works
- * as shown in the example below. Please refer to the JavaDocs of {@link 
ListCheckpointed} for
- * a more detailed description of how to use the new interface.
- *
- * <p><pre>{@code
- * public class ExampleFunction<T> implements MapFunction<T, T>, 
ListCheckpointed<Integer> {
- *
- *     private int count;
- *
- *     public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
- *         return Collections.singletonList(this.count);
- *     }
- *
- *     public void restoreState(List<Integer> state) throws Exception {
- *         this.value = state.isEmpty() ? 0 : state.get(0);
- *     }
- *
- *     public T map(T value) {
- *         count++;
- *         return value;
- *     }
- * }
- * }</pre>
- *
- * @deprecated Please use {@link ListCheckpointed} and {@link 
CheckpointedFunction} instead,
- *             as illustrated in the example above.
- */
-@Deprecated
-@PublicEvolving
-public interface CheckpointedAsynchronously<T extends Serializable> extends 
Checkpointed<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
deleted file mode 100644
index cfaa505..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.Serializable;
-
-/**
- * This deprecated interface contains the methods for restoring from the 
legacy checkpointing mechanism of state.
- * @param <T> type of the restored state.
- *
- * @deprecated Please use {@link CheckpointedFunction} or {@link 
ListCheckpointed} after restoring your legacy state.
- */
-@Deprecated
-@PublicEvolving
-public interface CheckpointedRestoring<T extends Serializable> {
-       /**
-        * Restores the state of the function or operator to that of a previous 
checkpoint.
-        * This method is invoked when a function is executed as part of a 
recovery run.
-        *
-        * <p>Note that restoreState() is called before open().
-        *
-        * @param state The state to be restored.
-        */
-       void restoreState(T state) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
deleted file mode 100644
index bb6e4bc..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-/**
- * For specifying what type of window operator was used to create the state
- * that a {@link 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}
- * is restoring from. This is used to signal that state written using an 
aligned processing-time
- * window operator should be restored.
- */
-public enum LegacyWindowOperatorType {
-
-       FAST_ACCUMULATING(true, false),
-
-       FAST_AGGREGATING(false, true),
-
-       NONE(false, false);
-
-       // 
------------------------------------------------------------------------
-
-       private final boolean fastAccumulating;
-       private final boolean fastAggregating;
-
-       LegacyWindowOperatorType(boolean fastAccumulating, boolean 
fastAggregating) {
-               this.fastAccumulating = fastAccumulating;
-               this.fastAggregating = fastAggregating;
-       }
-
-       public boolean isFastAccumulating() {
-               return fastAccumulating;
-       }
-
-       public boolean isFastAggregating() {
-               return fastAggregating;
-       }
-
-       @Override
-       public String toString() {
-               if (fastAccumulating) {
-                       return "AccumulatingProcessingTimeWindowOperator";
-               } else if (fastAggregating) {
-                       return "AggregatingProcessingTimeWindowOperator";
-               } else {
-                       return "WindowOperator";
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 348861f..f904a10 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -50,19 +49,11 @@ import 
org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import 
org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import 
org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
@@ -227,33 +218,7 @@ public class WindowedStream<T, K, W extends Window> {
 
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
-
-               String callLocation = Utils.getCallLocationName();
-               String udfName = "WindowedStream." + callLocation;
-
-               SingleOutputStreamOperator<T> result = 
createFastTimeOperatorIfValid(function, input.getType(), udfName);
-               if (result != null) {
-                       return result;
-               }
-
-               LegacyWindowOperatorType legacyOpType = 
getLegacyWindowType(function);
-               return reduce(function, new PassThroughWindowFunction<K, W, 
T>(), legacyOpType);
-       }
-
-       /**
-        * Applies the given window function to each window. The window 
function is called for each
-        * evaluation of the window for each key individually. The output of 
the window function is
-        * interpreted as a regular non-windowed stream.
-        *
-        * <p>Arriving data is incrementally aggregated using the given reducer.
-        *
-        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
-        * @param function The window function.
-        * @return The data stream that is the result of applying the window 
function to the window.
-        */
-       @PublicEvolving
-       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function) {
-               return reduce(reduceFunction, function, 
LegacyWindowOperatorType.NONE);
+               return reduce(function, new PassThroughWindowFunction<K, W, 
T>());
        }
 
        /**
@@ -265,39 +230,15 @@ public class WindowedStream<T, K, W extends Window> {
         *
         * @param reduceFunction The reduce function that is used for 
incremental aggregation.
         * @param function The window function.
-        * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       @PublicEvolving
        public <R> SingleOutputStreamOperator<R> reduce(
-               ReduceFunction<T> reduceFunction,
-               WindowFunction<T, R, K, W> function,
-               TypeInformation<R> resultType) {
-               return reduce(reduceFunction, function, resultType, 
LegacyWindowOperatorType.NONE);
-       }
-
-       /**
-        * Applies the given window function to each window. The window 
function is called for each
-        * evaluation of the window for each key individually. The output of 
the window function is
-        * interpreted as a regular non-windowed stream.
-        *
-        * <p>Arriving data is incrementally aggregated using the given reducer.
-        *
-        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
-        * @param function The window function.
-        * @param legacyWindowOpType When migrating from an older Flink 
version, this flag indicates
-        *                           the type of the previous operator whose 
state we inherit.
-        * @return The data stream that is the result of applying the window 
function to the window.
-        */
-       private <R> SingleOutputStreamOperator<R> reduce(
                        ReduceFunction<T> reduceFunction,
-                       WindowFunction<T, R, K, W> function,
-                       LegacyWindowOperatorType legacyWindowOpType) {
+                       WindowFunction<T, R, K, W> function) {
 
                TypeInformation<T> inType = input.getType();
                TypeInformation<R> resultType = 
getWindowFunctionReturnType(function, inType);
-
-               return reduce(reduceFunction, function, resultType, 
legacyWindowOpType);
+               return reduce(reduceFunction, function, resultType);
        }
 
        /**
@@ -310,15 +251,12 @@ public class WindowedStream<T, K, W extends Window> {
         * @param reduceFunction The reduce function that is used for 
incremental aggregation.
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function.
-        * @param legacyWindowOpType When migrating from an older Flink 
version, this flag indicates
-        *                           the type of the previous operator whose 
state we inherit.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
-       private <R> SingleOutputStreamOperator<R> reduce(
+       public <R> SingleOutputStreamOperator<R> reduce(
                        ReduceFunction<T> reduceFunction,
                        WindowFunction<T, R, K, W> function,
-                       TypeInformation<R> resultType,
-                       LegacyWindowOperatorType legacyWindowOpType) {
+                       TypeInformation<R> resultType) {
 
                if (reduceFunction instanceof RichFunction) {
                        throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction.");
@@ -374,8 +312,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        new 
InternalSingleValueWindowFunction<>(function),
                                        trigger,
                                        allowedLateness,
-                                       lateDataOutputTag,
-                                       legacyWindowOpType);
+                                       lateDataOutputTag);
                }
 
                return input.transform(opName, resultType, operator);
@@ -1183,12 +1120,6 @@ public class WindowedStream<T, K, W extends Window> {
 
                String udfName = "WindowedStream." + callLocation;
 
-               SingleOutputStreamOperator<R> result = 
createFastTimeOperatorIfValid(function, resultType, udfName);
-               if (result != null) {
-                       return result;
-               }
-
-               LegacyWindowOperatorType legacyWindowOpType = 
getLegacyWindowType(function);
                String opName;
                KeySelector<T, K> keySel = input.getKeySelector();
 
@@ -1231,8 +1162,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        function,
                                        trigger,
                                        allowedLateness,
-                                       lateDataOutputTag,
-                                       legacyWindowOpType);
+                                       lateDataOutputTag);
                }
 
                return input.transform(opName, resultType, operator);
@@ -1629,119 +1559,6 @@ public class WindowedStream<T, K, W extends Window> {
                return reduce(aggregator);
        }
 
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private LegacyWindowOperatorType getLegacyWindowType(Function function) 
{
-               if (windowAssigner instanceof SlidingProcessingTimeWindows && 
trigger instanceof ProcessingTimeTrigger && evictor == null) {
-                       if (function instanceof ReduceFunction) {
-                               return 
LegacyWindowOperatorType.FAST_AGGREGATING;
-                       } else if (function instanceof WindowFunction) {
-                               return 
LegacyWindowOperatorType.FAST_ACCUMULATING;
-                       }
-               } else if (windowAssigner instanceof 
TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && 
evictor == null) {
-                       if (function instanceof ReduceFunction) {
-                               return 
LegacyWindowOperatorType.FAST_AGGREGATING;
-                       } else if (function instanceof WindowFunction) {
-                               return 
LegacyWindowOperatorType.FAST_ACCUMULATING;
-                       }
-               }
-               return LegacyWindowOperatorType.NONE;
-       }
-
-       private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-                       ReduceFunction<?> function,
-                       TypeInformation<R> resultType,
-                       String functionName) {
-
-               if (windowAssigner.getClass() == 
SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == 
null) {
-                       SlidingAlignedProcessingTimeWindows timeWindows = 
(SlidingAlignedProcessingTimeWindows) windowAssigner;
-                       final long windowLength = timeWindows.getSize();
-                       final long windowSlide = timeWindows.getSlide();
-
-                       String opName = "Fast " + timeWindows + " of " + 
functionName;
-
-                       @SuppressWarnings("unchecked")
-                       ReduceFunction<T> reducer = (ReduceFunction<T>) 
function;
-
-                       @SuppressWarnings("unchecked")
-                       OneInputStreamOperator<T, R> op = 
(OneInputStreamOperator<T, R>)
-                                       new 
AggregatingProcessingTimeWindowOperator<>(
-                                                       reducer, 
input.getKeySelector(),
-                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                       windowLength, 
windowSlide);
-                       return input.transform(opName, resultType, op);
-
-               } else if (windowAssigner.getClass() == 
TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == 
null) {
-                       TumblingAlignedProcessingTimeWindows timeWindows = 
(TumblingAlignedProcessingTimeWindows) windowAssigner;
-                       final long windowLength = timeWindows.getSize();
-                       final long windowSlide = timeWindows.getSize();
-
-                       String opName = "Fast " + timeWindows + " of " + 
functionName;
-
-                       @SuppressWarnings("unchecked")
-                       ReduceFunction<T> reducer = (ReduceFunction<T>) 
function;
-
-                       @SuppressWarnings("unchecked")
-                       OneInputStreamOperator<T, R> op = 
(OneInputStreamOperator<T, R>)
-                                       new 
AggregatingProcessingTimeWindowOperator<>(
-                                                       reducer,
-                                                       input.getKeySelector(),
-                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                       windowLength, 
windowSlide);
-                       return input.transform(opName, resultType, op);
-               }
-
-               return null;
-       }
-
-       private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-                       InternalWindowFunction<Iterable<T>, R, K, W> function,
-                       TypeInformation<R> resultType,
-                       String functionName) {
-
-               if (windowAssigner.getClass() == 
SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == 
null) {
-                       SlidingAlignedProcessingTimeWindows timeWindows = 
(SlidingAlignedProcessingTimeWindows) windowAssigner;
-                       final long windowLength = timeWindows.getSize();
-                       final long windowSlide = timeWindows.getSlide();
-
-                       String opName = "Fast " + timeWindows + " of " + 
functionName;
-
-                       @SuppressWarnings("unchecked")
-                       InternalWindowFunction<Iterable<T>, R, K, TimeWindow> 
timeWindowFunction =
-                                       (InternalWindowFunction<Iterable<T>, R, 
K, TimeWindow>) function;
-
-                       OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
-                                       timeWindowFunction, 
input.getKeySelector(),
-                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       windowLength, windowSlide);
-                       return input.transform(opName, resultType, op);
-               } else if (windowAssigner.getClass() == 
TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == 
null) {
-                       TumblingAlignedProcessingTimeWindows timeWindows = 
(TumblingAlignedProcessingTimeWindows) windowAssigner;
-                       final long windowLength = timeWindows.getSize();
-                       final long windowSlide = timeWindows.getSize();
-
-                       String opName = "Fast " + timeWindows + " of " + 
functionName;
-
-                       @SuppressWarnings("unchecked")
-                       InternalWindowFunction<Iterable<T>, R, K, TimeWindow> 
timeWindowFunction =
-                                       (InternalWindowFunction<Iterable<T>, R, 
K, TimeWindow>) function;
-
-                       OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
-                                       timeWindowFunction, 
input.getKeySelector(),
-                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                       windowLength, windowSlide);
-                       return input.transform(opName, resultType, op);
-               }
-
-               return null;
-       }
-
        public StreamExecutionEnvironment getExecutionEnvironment() {
                return input.getExecutionEnvironment();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 3c4cfbd..fedd791 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -32,7 +32,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -66,7 +65,7 @@ import java.util.TreeMap;
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-       extends RichSourceFunction<TimestampedFileInputSplit> implements 
CheckpointedFunction, CheckpointedRestoring<Long> {
+       extends RichSourceFunction<TimestampedFileInputSplit> implements 
CheckpointedFunction {
 
        private static final long serialVersionUID = 1L;
 
@@ -375,12 +374,4 @@ public class ContinuousFileMonitoringFunction<OUT>
                        LOG.debug("{} checkpointed {}.", 
getClass().getSimpleName(), globalModificationTime);
                }
        }
-
-       @Override
-       public void restoreState(Long state) throws Exception {
-               this.globalModificationTime = state;
-
-               LOG.info("{} (taskIdx={}) restored global modification time 
from an older Flink version: {}",
-                       getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask(), globalModificationTime);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 3a9e8e1..e14cfda 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -25,30 +25,23 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -60,15 +53,15 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * The operator that reads the {@link TimestampedFileInputSplit splits} 
received from the preceding
  * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link 
ContinuousFileMonitoringFunction}
  * which has a parallelism of 1, this operator can have DOP > 1.
- * <p/>
- * As soon as a split descriptor is received, it is put in a queue, and have 
another
+ *
+ * <p>As soon as a split descriptor is received, it is put in a queue, and 
have another
  * thread read the actual data of the split. This architecture allows the 
separation of the
  * reading thread from the one emitting the checkpoint barriers, thus removing 
any potential
  * back-pressure.
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, 
OutputTypeConfigurable<OUT>, CheckpointedRestoringOperator {
+       implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, 
OutputTypeConfigurable<OUT> {
 
        private static final long serialVersionUID = 1L;
 
@@ -422,83 +415,4 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                                getClass().getSimpleName(), subtaskIdx, 
readerState.size(), readerState);
                }
        }
-
-       // 
------------------------------------------------------------------------
-       //  Restoring / Migrating from an older Flink version.
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void restoreState(FSDataInputStream in) throws Exception {
-
-               LOG.info("{} (taskIdx={}) restoring state from an older Flink 
version.",
-                       getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask());
-
-               // this is just to read the byte indicating if we have udf 
state or not
-               int hasUdfState = in.read();
-
-               Preconditions.checkArgument(hasUdfState == 0);
-
-               final ObjectInputStream ois = new ObjectInputStream(in);
-               final DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(in);
-
-               // read the split that was being read
-               FileInputSplit currSplit = (FileInputSplit) ois.readObject();
-
-               // read the pending splits list
-               List<FileInputSplit> pendingSplits = new LinkedList<>();
-               int noOfSplits = div.readInt();
-               for (int i = 0; i < noOfSplits; i++) {
-                       FileInputSplit split = (FileInputSplit) 
ois.readObject();
-                       pendingSplits.add(split);
-               }
-
-               // read the state of the format
-               Serializable formatState = (Serializable) ois.readObject();
-
-               div.close();
-
-               if (restoredReaderState == null) {
-                       restoredReaderState = new ArrayList<>();
-               }
-
-               // we do not know the modification time of the retrieved 
splits, so we assign them
-               // artificial ones, with the only constraint that they respect 
the relative order of the
-               // retrieved splits, because modification time is going to be 
used to sort the splits within
-               // the "pending splits" priority queue.
-
-               long now = 
getProcessingTimeService().getCurrentProcessingTime();
-               long runningModTime = Math.max(now, noOfSplits + 1);
-
-               TimestampedFileInputSplit currentSplit = 
createTimestampedFileSplit(currSplit, --runningModTime, formatState);
-               restoredReaderState.add(currentSplit);
-               for (FileInputSplit split : pendingSplits) {
-                       TimestampedFileInputSplit timestampedSplit = 
createTimestampedFileSplit(split, --runningModTime);
-                       restoredReaderState.add(timestampedSplit);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{} (taskIdx={}) restored {} splits 
from legacy: {}.",
-                                       getClass().getSimpleName(),
-                                       
getRuntimeContext().getIndexOfThisSubtask(),
-                                       restoredReaderState.size(),
-                                       restoredReaderState);
-                       }
-               }
-       }
-
-       private TimestampedFileInputSplit 
createTimestampedFileSplit(FileInputSplit split, long modificationTime) {
-               return createTimestampedFileSplit(split, modificationTime, 
null);
-       }
-
-       private TimestampedFileInputSplit 
createTimestampedFileSplit(FileInputSplit split, long modificationTime, 
Serializable state) {
-               TimestampedFileInputSplit timestampedSplit = new 
TimestampedFileInputSplit(
-                       modificationTime, split.getSplitNumber(), 
split.getPath(),
-                       split.getStart(), split.getLength(), 
split.getHostnames());
-
-               if (state != null) {
-                       timestampedSplit.setSplitState(state);
-               }
-               return timestampedSplit;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index abaa74e..884b899 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -116,7 +115,7 @@ public class StreamingJobGraphGenerator {
        private StreamingJobGraphGenerator(StreamGraph streamGraph) {
                this.streamGraph = streamGraph;
                this.defaultStreamGraphHasher = new StreamGraphHasherV2();
-               this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphHasherV1(), new StreamGraphUserHashHasher());
+               this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphUserHashHasher());
 
                this.jobVertices = new HashMap<>();
                this.builtVertices = new HashSet<>();
@@ -241,14 +240,14 @@ public class StreamingJobGraphGenerator {
                                createChain(nonChainable.getTargetId(), 
nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
                        }
 
-                       List<Tuple2<byte[], byte[]>> operatorHashes = 
chainedOperatorHashes.get(startNodeId);
-                       if (operatorHashes == null) {
-                               operatorHashes = new ArrayList<>();
-                               chainedOperatorHashes.put(startNodeId, 
operatorHashes);
-                       }
+                       List<Tuple2<byte[], byte[]>> operatorHashes =
+                               
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
 
                        byte[] primaryHashBytes = hashes.get(currentNodeId);
-                       operatorHashes.add(new Tuple2<>(primaryHashBytes, 
legacyHashes.get(1).get(currentNodeId)));
+
+                       for (Map<Integer, byte[]> legacyHash : legacyHashes) {
+                               operatorHashes.add(new 
Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
+                       }
 
                        chainedNames.put(currentNodeId, 
createChainedName(currentNodeId, chainableOutputs));
                        chainedMinResources.put(currentNodeId, 
createChainedMinResources(currentNodeId, chainableOutputs));

http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a72b9fe..a28fc30 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
@@ -55,7 +54,6 @@ import 
org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -251,42 +249,6 @@ public abstract class AbstractStreamOperator<OUT>
                                getContainingTask().getCancelables()); // 
access to register streams for canceling
 
                initializeState(initializationContext);
-
-               if (restoring) {
-
-                       // finally restore the legacy state in case we are
-                       // migrating from a previous Flink version.
-
-                       restoreStreamCheckpointed(stateHandles);
-               }
-       }
-
-       /**
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @Deprecated
-       private void restoreStreamCheckpointed(OperatorSubtaskState 
stateHandles) throws Exception {
-               StreamStateHandle state = stateHandles.getLegacyOperatorState();
-               if (null != state) {
-                       if (this instanceof CheckpointedRestoringOperator) {
-
-                               LOG.debug("Restore state of task {} in operator 
with id ({}).",
-                                       getContainingTask().getName(), 
getOperatorID());
-
-                               FSDataInputStream is = state.openInputStream();
-                               try {
-                                       
getContainingTask().getCancelables().registerClosable(is);
-                                       ((CheckpointedRestoringOperator) 
this).restoreState(is);
-                               } finally {
-                                       
getContainingTask().getCancelables().unregisterClosable(is);
-                                       is.close();
-                               }
-                       } else {
-                               throw new Exception(
-                                               "Found legacy operator state 
for operator that does not implement StreamCheckpointedOperator.");
-                       }
-               }
        }
 
        /**
@@ -451,35 +413,6 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        /**
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @SuppressWarnings("deprecation")
-       @Deprecated
-       @Override
-       public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, 
long timestamp, CheckpointOptions checkpointOptions) throws Exception {
-               if (this instanceof StreamCheckpointedOperator) {
-                       CheckpointStreamFactory factory = 
getCheckpointStreamFactory(checkpointOptions);
-
-                       final 
CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-                               
factory.createCheckpointStateOutputStream(checkpointId, timestamp);
-
-                       
getContainingTask().getCancelables().registerClosable(outStream);
-
-                       try {
-                               ((StreamCheckpointedOperator) 
this).snapshotState(outStream, checkpointId, timestamp);
-                               return outStream.closeAndGetHandle();
-                       }
-                       finally {
-                               
getContainingTask().getCancelables().unregisterClosable(outStream);
-                               outStream.close();
-                       }
-               } else {
-                       return null;
-               }
-       }
-
-       /**
         * Stream operators with state which can be restored need to override 
this hook method.
         *
         * @param context context that allows to register different states.

Reply via email to