This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0114338  [FLINK-16638][runtime][checkpointing] Flink 
checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs
0114338 is described below

commit 0114338da5ce52677d1dfa1ab4350b1567dc3522
Author: edu05 <eduardo.winpe...@gmail.com>
AuthorDate: Wed Apr 29 19:36:08 2020 +0200

    [FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness 
doesn't include UserDefinedOperatorIDs
---
 .../state/api/input/OperatorStateInputFormat.java  |  3 +-
 .../state/api/runtime/OperatorIDGeneratorTest.java |  2 +-
 .../org/apache/flink/runtime/OperatorIDPair.java   | 58 +++++++++++++++
 .../flink/runtime/checkpoint/Checkpoints.java      | 17 +----
 .../runtime/checkpoint/PendingCheckpoint.java      | 13 ++--
 .../checkpoint/StateAssignmentOperation.java       | 35 ++++-----
 .../runtime/executiongraph/ExecutionJobVertex.java | 87 ++--------------------
 .../runtime/jobgraph/InputOutputFormatVertex.java  |  7 +-
 .../apache/flink/runtime/jobgraph/JobVertex.java   | 49 ++++--------
 .../ChannelStateNoRescalingPartitionerTest.java    |  3 +-
 .../CheckpointCoordinatorRestoringTest.java        | 23 +++---
 .../checkpoint/CheckpointCoordinatorTest.java      |  2 +-
 .../CheckpointCoordinatorTestingUtils.java         | 18 +++--
 .../checkpoint/CheckpointMetadataLoadingTest.java  |  3 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |  4 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  3 +-
 .../checkpoint/StateAssignmentOperationTest.java   | 54 ++++++++++----
 .../executiongraph/LegacyJobVertexIdTest.java      | 64 ----------------
 .../api/graph/StreamingJobGraphGenerator.java      | 24 ++----
 .../StreamingJobGraphGeneratorNodeHashTest.java    | 23 ------
 20 files changed, 191 insertions(+), 301 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
index a312938..ec4c9b1 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import 
org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
@@ -123,7 +124,7 @@ abstract class OperatorStateInputFormat<OT> extends 
RichInputFormat<OT, Operator
                Map<OperatorInstanceID, List<OperatorStateHandle>> 
newManagedOperatorStates = reDistributePartitionableStates(
                        singletonList(operatorState),
                        minNumSplits,
-                       singletonList(operatorState.getOperatorID()),
+                       
singletonList(OperatorIDPair.generatedIDOnly(operatorState.getOperatorID())),
                        OperatorSubtaskState::getManagedOperatorState,
                        RoundRobinOperatorStateRepartitioner.INSTANCE);
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
index 9dff2a1..cb90641 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
@@ -65,6 +65,6 @@ public class OperatorIDGeneratorTest {
                        .findFirst()
                        .orElseThrow(() -> new IllegalStateException("Unable to 
find vertex"));
 
-               return vertex.getOperatorIDs().get(0);
+               return vertex.getOperatorIDs().get(0).getGeneratedOperatorID();
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
new file mode 100644
index 0000000..e6070d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Formed of a mandatory operator ID and optionally a user defined operator ID.
+ */
+public class OperatorIDPair implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final OperatorID generatedOperatorID;
+       private final OperatorID userDefinedOperatorID;
+
+       private OperatorIDPair(OperatorID generatedOperatorID, @Nullable 
OperatorID userDefinedOperatorID) {
+               this.generatedOperatorID = generatedOperatorID;
+               this.userDefinedOperatorID = userDefinedOperatorID;
+       }
+
+       public static OperatorIDPair of(OperatorID generatedOperatorID, 
@Nullable OperatorID userDefinedOperatorID) {
+               return new OperatorIDPair(generatedOperatorID, 
userDefinedOperatorID);
+       }
+
+       public static OperatorIDPair generatedIDOnly(OperatorID 
generatedOperatorID) {
+               return new OperatorIDPair(generatedOperatorID, null);
+       }
+
+       public OperatorID getGeneratedOperatorID() {
+               return generatedOperatorID;
+       }
+
+       public Optional<OperatorID> getUserDefinedOperatorID() {
+               return Optional.ofNullable(userDefinedOperatorID);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 17db5e6..37f96f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
 import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers;
@@ -137,28 +138,18 @@ public class Checkpoints {
                // generate mapping from operator to task
                Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping 
= new HashMap<>();
                for (ExecutionJobVertex task : tasks.values()) {
-                       for (OperatorID operatorID : task.getOperatorIDs()) {
-                               operatorToJobVertexMapping.put(operatorID, 
task);
+                       for (OperatorIDPair operatorIDPair : 
task.getOperatorIDs()) {
+                               
operatorToJobVertexMapping.put(operatorIDPair.getGeneratedOperatorID(), task);
+                               
operatorIDPair.getUserDefinedOperatorID().ifPresent(id -> 
operatorToJobVertexMapping.put(id, task));
                        }
                }
 
                // (2) validate it (parallelism, etc)
-               boolean expandedToLegacyIds = false;
-
                HashMap<OperatorID, OperatorState> operatorStates = new 
HashMap<>(checkpointMetadata.getOperatorStates().size());
                for (OperatorState operatorState : 
checkpointMetadata.getOperatorStates()) {
 
                        ExecutionJobVertex executionJobVertex = 
operatorToJobVertexMapping.get(operatorState.getOperatorID());
 
-                       // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
-                       // for example as generated from older flink versions, 
to provide backwards compatibility.
-                       if (executionJobVertex == null && !expandedToLegacyIds) 
{
-                               operatorToJobVertexMapping = 
ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
-                               executionJobVertex = 
operatorToJobVertexMapping.get(operatorState.getOperatorID());
-                               expandedToLegacyIds = true;
-                               LOG.info("Could not find ExecutionJobVertex. 
Including user-defined OperatorIDs in search.");
-                       }
-
                        if (executionJobVertex != null) {
 
                                if (executionJobVertex.getMaxParallelism() == 
operatorState.getMaxParallelism()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 52844f8..6ccdc53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -371,31 +372,31 @@ public class PendingCheckpoint {
                                acknowledgedTasks.add(executionAttemptId);
                        }
 
-                       List<OperatorID> operatorIDs = 
vertex.getJobVertex().getOperatorIDs();
+                       List<OperatorIDPair> operatorIDs = 
vertex.getJobVertex().getOperatorIDs();
                        int subtaskIndex = vertex.getParallelSubtaskIndex();
                        long ackTimestamp = System.currentTimeMillis();
 
                        long stateSize = 0L;
 
                        if (operatorSubtaskStates != null) {
-                               for (OperatorID operatorID : operatorIDs) {
+                               for (OperatorIDPair operatorID : operatorIDs) {
 
                                        OperatorSubtaskState 
operatorSubtaskState =
-                                               
operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);
+                                               
operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID());
 
                                        // if no real operatorSubtaskState was 
reported, we insert an empty state
                                        if (operatorSubtaskState == null) {
                                                operatorSubtaskState = new 
OperatorSubtaskState();
                                        }
 
-                                       OperatorState operatorState = 
operatorStates.get(operatorID);
+                                       OperatorState operatorState = 
operatorStates.get(operatorID.getGeneratedOperatorID());
 
                                        if (operatorState == null) {
                                                operatorState = new 
OperatorState(
-                                                       operatorID,
+                                                       
operatorID.getGeneratedOperatorID(),
                                                        
vertex.getTotalNumberOfParallelSubtasks(),
                                                        
vertex.getMaxParallelism());
-                                               operatorStates.put(operatorID, 
operatorState);
+                                               
operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState);
                                        }
 
                                        operatorState.putState(subtaskIndex, 
operatorSubtaskState);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 7f49828..9644430 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -86,14 +87,11 @@ public class StateAssignmentOperation {
                for (ExecutionJobVertex executionJobVertex : this.tasks) {
 
                        // find the states of all operators belonging to this 
task
-                       List<OperatorID> operatorIDs = 
executionJobVertex.getOperatorIDs();
-                       List<OperatorID> altOperatorIDs = 
executionJobVertex.getUserDefinedOperatorIDs();
-                       List<OperatorState> operatorStates = new 
ArrayList<>(operatorIDs.size());
+                       List<OperatorIDPair> operatorIDPairs = 
executionJobVertex.getOperatorIDs();
+                       List<OperatorState> operatorStates = new 
ArrayList<>(operatorIDPairs.size());
                        boolean statelessTask = true;
-                       for (int x = 0; x < operatorIDs.size(); x++) {
-                               OperatorID operatorID = altOperatorIDs.get(x) 
== null
-                                       ? operatorIDs.get(x)
-                                       : altOperatorIDs.get(x);
+                       for (OperatorIDPair operatorIDPair : operatorIDPairs) {
+                               OperatorID operatorID = 
operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID());
 
                                OperatorState operatorState = 
localOperators.remove(operatorID);
                                if (operatorState == null) {
@@ -115,7 +113,7 @@ public class StateAssignmentOperation {
 
        private void assignAttemptState(ExecutionJobVertex executionJobVertex, 
List<OperatorState> operatorStates) {
 
-               List<OperatorID> operatorIDs = 
executionJobVertex.getOperatorIDs();
+               List<OperatorIDPair> operatorIDs = 
executionJobVertex.getOperatorIDs();
 
                //1. first compute the new parallelism
                checkParallelismPreconditions(operatorStates, 
executionJobVertex);
@@ -214,7 +212,7 @@ public class StateAssignmentOperation {
                        Map<OperatorInstanceID, List<KeyedStateHandle>> 
subRawKeyedState,
                        int newParallelism) {
 
-               List<OperatorID> operatorIDs = 
executionJobVertex.getOperatorIDs();
+               List<OperatorIDPair> operatorIDs = 
executionJobVertex.getOperatorIDs();
 
                for (int subTaskIndex = 0; subTaskIndex < newParallelism; 
subTaskIndex++) {
 
@@ -224,8 +222,8 @@ public class StateAssignmentOperation {
                        TaskStateSnapshot taskState = new 
TaskStateSnapshot(operatorIDs.size());
                        boolean statelessTask = true;
 
-                       for (OperatorID operatorID : operatorIDs) {
-                               OperatorInstanceID instanceID = 
OperatorInstanceID.of(subTaskIndex, operatorID);
+                       for (OperatorIDPair operatorID : operatorIDs) {
+                               OperatorInstanceID instanceID = 
OperatorInstanceID.of(subTaskIndex, operatorID.getGeneratedOperatorID());
 
                                OperatorSubtaskState operatorSubtaskState = 
operatorSubtaskStateFrom(
                                        instanceID,
@@ -239,7 +237,7 @@ public class StateAssignmentOperation {
                                if (operatorSubtaskState.hasState()) {
                                        statelessTask = false;
                                }
-                               
taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
+                               
taskState.putSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID(), 
operatorSubtaskState);
                        }
 
                        if (!statelessTask) {
@@ -288,7 +286,7 @@ public class StateAssignmentOperation {
        private void reDistributeKeyedStates(
                        List<OperatorState> oldOperatorStates,
                        int newParallelism,
-                       List<OperatorID> newOperatorIDs,
+                       List<OperatorIDPair> newOperatorIDs,
                        List<KeyGroupRange> newKeyGroupPartitions,
                        Map<OperatorInstanceID, List<KeyedStateHandle>> 
newManagedKeyedState,
                        Map<OperatorInstanceID, List<KeyedStateHandle>> 
newRawKeyedState) {
@@ -300,7 +298,7 @@ public class StateAssignmentOperation {
                        OperatorState operatorState = 
oldOperatorStates.get(operatorIndex);
                        int oldParallelism = operatorState.getParallelism();
                        for (int subTaskIndex = 0; subTaskIndex < 
newParallelism; subTaskIndex++) {
-                               OperatorInstanceID instanceID = 
OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex));
+                               OperatorInstanceID instanceID = 
OperatorInstanceID.of(subTaskIndex, 
newOperatorIDs.get(operatorIndex).getGeneratedOperatorID());
                                Tuple2<List<KeyedStateHandle>, 
List<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates(
                                        operatorState,
                                        newKeyGroupPartitions,
@@ -347,7 +345,7 @@ public class StateAssignmentOperation {
        public static <T extends StateObject> Map<OperatorInstanceID, List<T>>  
reDistributePartitionableStates(
                        List<OperatorState> oldOperatorStates,
                        int newParallelism,
-                       List<OperatorID> newOperatorIDs,
+                       List<OperatorIDPair> newOperatorIDs,
                        Function<OperatorSubtaskState, 
StateObjectCollection<T>> extractHandle,
                        OperatorStateRepartitioner<T> stateRepartitioner) {
 
@@ -361,7 +359,7 @@ public class StateAssignmentOperation {
                Map<OperatorInstanceID, List<T>> result = new HashMap<>();
                for (int operatorIndex = 0; operatorIndex < 
newOperatorIDs.size(); operatorIndex++) {
                        result.putAll(applyRepartitioner(
-                               newOperatorIDs.get(operatorIndex),
+                               
newOperatorIDs.get(operatorIndex).getGeneratedOperatorID(),
                                stateRepartitioner,
                                oldStates.get(operatorIndex),
                                
oldOperatorStates.get(operatorIndex).getParallelism(),
@@ -555,7 +553,10 @@ public class StateAssignmentOperation {
 
                Set<OperatorID> allOperatorIDs = new HashSet<>();
                for (ExecutionJobVertex executionJobVertex : tasks) {
-                       
allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
+                       for (OperatorIDPair operatorIDPair : 
executionJobVertex.getOperatorIDs()) {
+                               
allOperatorIDs.add(operatorIDPair.getGeneratedOperatorID());
+                               
operatorIDPair.getUserDefinedOperatorID().ifPresent(allOperatorIDs::add);
+                       }
                }
                for (Map.Entry<OperatorID, OperatorState> 
operatorGroupStateEntry : operatorStates.entrySet()) {
                        OperatorState operatorState = 
operatorGroupStateEntry.getValue();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9cca608..ee2cd2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -91,24 +92,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
 
        private final JobVertex jobVertex;
 
-       /**
-        * The IDs of all operators contained in this execution job vertex.
-        *
-        * <p>The ID's are stored depth-first post-order; for the forking chain 
below the ID's would be stored as [D, E, B, C, A].
-        *  A - B - D
-        *   \    \
-        *    C    E
-        * This is the same order that operators are stored in the {@code 
StreamTask}.
-        */
-       private final List<OperatorID> operatorIDs;
-
-       /**
-        * The alternative IDs of all operators contained in this execution job 
vertex.
-        *
-        * <p>The ID's are in the same order as {@link 
ExecutionJobVertex#operatorIDs}.
-        */
-       private final List<OperatorID> userDefinedOperatorIds;
-
        private final ExecutionVertex[] taskVertices;
 
        private final IntermediateResult[] producedDataSets;
@@ -200,8 +183,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                this.resourceProfile = 
ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
 
                this.taskVertices = new ExecutionVertex[numTaskVertices];
-               this.operatorIDs = 
Collections.unmodifiableList(jobVertex.getOperatorIDs());
-               this.userDefinedOperatorIds = 
Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
 
                this.inputs = new ArrayList<>(jobVertex.getInputs().size());
 
@@ -289,21 +270,12 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        }
 
        /**
-        * Returns a list containing the IDs of all operators contained in this 
execution job vertex.
+        * Returns a list containing the ID pairs of all operators contained in 
this execution job vertex.
         *
-        * @return list containing the IDs of all contained operators
+        * @return list containing the ID pairs of all contained operators
         */
-       public List<OperatorID> getOperatorIDs() {
-               return operatorIDs;
-       }
-
-       /**
-        * Returns a list containing the alternative IDs of all operators 
contained in this execution job vertex.
-        *
-        * @return list containing alternative the IDs of all contained 
operators
-        */
-       public List<OperatorID> getUserDefinedOperatorIDs() {
-               return userDefinedOperatorIds;
+       public List<OperatorIDPair> getOperatorIDs() {
+               return jobVertex.getOperatorIDs();
        }
 
        public void setMaxParallelism(int maxParallelismDerived) {
@@ -626,53 +598,4 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                        return ExecutionState.CREATED;
                }
        }
-
-       public static Map<JobVertexID, ExecutionJobVertex> 
includeLegacyJobVertexIDs(
-                       Map<JobVertexID, ExecutionJobVertex> tasks) {
-
-               Map<JobVertexID, ExecutionJobVertex> expanded = new HashMap<>(2 
* tasks.size());
-               // first include all new ids
-               expanded.putAll(tasks);
-
-               // now expand and add legacy ids
-               for (ExecutionJobVertex executionJobVertex : tasks.values()) {
-                       if (null != executionJobVertex) {
-                               JobVertex jobVertex = 
executionJobVertex.getJobVertex();
-                               if (null != jobVertex) {
-                                       List<JobVertexID> alternativeIds = 
jobVertex.getIdAlternatives();
-                                       for (JobVertexID jobVertexID : 
alternativeIds) {
-                                               ExecutionJobVertex old = 
expanded.put(jobVertexID, executionJobVertex);
-                                               Preconditions.checkState(null 
== old || old.equals(executionJobVertex),
-                                                               "Ambiguous 
jobvertex id detected during expansion to legacy ids.");
-                                       }
-                               }
-                       }
-               }
-
-               return expanded;
-       }
-
-       public static Map<OperatorID, ExecutionJobVertex> 
includeAlternativeOperatorIDs(
-                       Map<OperatorID, ExecutionJobVertex> operatorMapping) {
-
-               Map<OperatorID, ExecutionJobVertex> expanded = new HashMap<>(2 
* operatorMapping.size());
-               // first include all existing ids
-               expanded.putAll(operatorMapping);
-
-               // now expand and add user-defined ids
-               for (ExecutionJobVertex executionJobVertex : 
operatorMapping.values()) {
-                       if (executionJobVertex != null) {
-                               JobVertex jobVertex = 
executionJobVertex.getJobVertex();
-                               if (jobVertex != null) {
-                                       for (OperatorID operatorID : 
jobVertex.getUserDefinedOperatorIDs()) {
-                                               if (operatorID != null) {
-                                                       
expanded.put(operatorID, executionJobVertex);
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               return expanded;
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
index 741df52..f1e7a2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 
 import java.util.HashMap;
@@ -48,11 +49,9 @@ public class InputOutputFormatVertex extends JobVertex {
        public InputOutputFormatVertex(
                String name,
                JobVertexID id,
-               List<JobVertexID> alternativeIds,
-               List<OperatorID> operatorIds,
-               List<OperatorID> alternativeOperatorIds) {
+               List<OperatorIDPair> operatorIDPairs) {
 
-               super(name, id, alternativeIds, operatorIds, 
alternativeOperatorIds);
+               super(name, id, operatorIDPairs);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index e9616a7..c77103d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -55,14 +56,15 @@ public class JobVertex implements java.io.Serializable {
        /** The ID of the vertex. */
        private final JobVertexID id;
 
-       /** The alternative IDs of the vertex. */
-       private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
-
-       /** The IDs of all operators contained in this vertex. */
-       private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();
-
-       /** The alternative IDs of all operators contained in this vertex. */
-       private final ArrayList<OperatorID> operatorIdsAlternatives = new 
ArrayList<>();
+       /** The IDs of all operators contained in this vertex.
+        *
+        * <p>The ID pairs are stored depth-first post-order; for the forking 
chain below the ID's would be stored as [D, E, B, C, A].
+        *  A - B - D
+        *   \    \
+        *    C    E
+        * This is the same order that operators are stored in the {@code 
StreamTask}.
+        */
+       private final List<OperatorIDPair> operatorIDs;
 
        /** List of produced data sets, one per writer. */
        private final ArrayList<IntermediateDataSet> results = new 
ArrayList<>();
@@ -143,9 +145,8 @@ public class JobVertex implements java.io.Serializable {
        public JobVertex(String name, JobVertexID id) {
                this.name = name == null ? DEFAULT_NAME : name;
                this.id = id == null ? new JobVertexID() : id;
-               // the id lists must have the same size
-               this.operatorIDs.add(OperatorID.fromJobVertexID(this.id));
-               this.operatorIdsAlternatives.add(null);
+               OperatorIDPair operatorIDPair = 
OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(this.id));
+               this.operatorIDs = Collections.singletonList(operatorIDPair);
        }
 
        /**
@@ -153,17 +154,12 @@ public class JobVertex implements java.io.Serializable {
         *
         * @param name The name of the new job vertex.
         * @param primaryId The id of the job vertex.
-        * @param alternativeIds The alternative ids of the job vertex.
-        * @param operatorIds The ids of all operators contained in this job 
vertex.
-        * @param alternativeOperatorIds The alternative ids of all operators 
contained in this job vertex-
+        * @param operatorIDPairs The operator ID pairs of the job vertex.
         */
-       public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> 
alternativeIds, List<OperatorID> operatorIds, List<OperatorID> 
alternativeOperatorIds) {
-               Preconditions.checkArgument(operatorIds.size() == 
alternativeOperatorIds.size());
+       public JobVertex(String name, JobVertexID primaryId, 
List<OperatorIDPair> operatorIDPairs) {
                this.name = name == null ? DEFAULT_NAME : name;
                this.id = primaryId == null ? new JobVertexID() : primaryId;
-               this.idAlternatives.addAll(alternativeIds);
-               this.operatorIDs.addAll(operatorIds);
-               this.operatorIdsAlternatives.addAll(alternativeOperatorIds);
+               this.operatorIDs = 
Collections.unmodifiableList(operatorIDPairs);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -178,15 +174,6 @@ public class JobVertex implements java.io.Serializable {
        }
 
        /**
-        * Returns a list of all alternative IDs of this job vertex.
-        *
-        * @return List of all alternative IDs for this job vertex
-        */
-       public List<JobVertexID> getIdAlternatives() {
-               return idAlternatives;
-       }
-
-       /**
         * Returns the name of the vertex.
         *
         * @return The name of the vertex.
@@ -222,14 +209,10 @@ public class JobVertex implements java.io.Serializable {
                return this.inputs.size();
        }
 
-       public List<OperatorID> getOperatorIDs() {
+       public List<OperatorIDPair> getOperatorIDs() {
                return operatorIDs;
        }
 
-       public List<OperatorID> getUserDefinedOperatorIDs() {
-               return operatorIdsAlternatives;
-       }
-
        /**
         * Returns the vertex's configuration object which can be used to pass 
custom settings to the task at runtime.
         *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
index a7763cb..91ae0ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -101,7 +102,7 @@ public class ChannelStateNoRescalingPartitionerTest {
                        
StateAssignmentOperation.reDistributePartitionableStates(
                                singletonList(state),
                                newParallelism,
-                               singletonList(OPERATOR_ID),
+                               
singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID)),
                                (Function<OperatorSubtaskState, 
StateObjectCollection<T>>) this.extractState,
                                channelStateNonRescalingRepartitioner("test"));
                } catch (IllegalArgumentException e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index d5acd88..0f03875 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.OperatorIDPair;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -506,7 +507,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                List<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = 
new ArrayList<>(newJobVertex2.getParallelism());
                for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
 
-                       List<OperatorID> operatorIDs = 
newJobVertex2.getOperatorIDs();
+                       List<OperatorIDPair> operatorIDs = 
newJobVertex2.getOperatorIDs();
 
                        KeyGroupsStateHandle originalKeyedStateBackend = 
generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), false);
                        KeyGroupsStateHandle originalKeyedStateRaw = 
generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), true);
@@ -520,7 +521,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        List<Collection<OperatorStateHandle>> 
allParallelRawOpStates = new ArrayList<>(operatorIDs.size());
 
                        for (int idx = 0; idx < operatorIDs.size(); ++idx) {
-                               OperatorID operatorID = operatorIDs.get(idx);
+                               OperatorID operatorID = 
operatorIDs.get(idx).getGeneratedOperatorID();
                                OperatorSubtaskState opState = 
taskStateHandles.getSubtaskStateByOperatorID(operatorID);
                                Collection<OperatorStateHandle> opStateBackend 
= opState.getManagedOperatorState();
                                Collection<OperatorStateHandle> opStateRaw = 
opState.getRawOperatorState();
@@ -827,13 +828,13 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
 
                for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
 
-                       final List<OperatorID> operatorIds = 
newJobVertex1.getOperatorIDs();
+                       final List<OperatorIDPair> operatorIDs = 
newJobVertex1.getOperatorIDs();
 
                        JobManagerTaskRestore taskRestore = 
newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
                        Assert.assertEquals(2L, 
taskRestore.getRestoreCheckpointId());
                        TaskStateSnapshot stateSnapshot = 
taskRestore.getTaskStateSnapshot();
 
-                       OperatorSubtaskState headOpState = 
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 
1));
+                       OperatorSubtaskState headOpState = 
stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 
1).getGeneratedOperatorID());
                        
assertTrue(headOpState.getManagedKeyedState().isEmpty());
                        assertTrue(headOpState.getRawKeyedState().isEmpty());
 
@@ -841,7 +842,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        {
                                int operatorIndexInChain = 2;
                                OperatorSubtaskState opState =
-                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
                                
assertTrue(opState.getManagedOperatorState().isEmpty());
                                
assertTrue(opState.getRawOperatorState().isEmpty());
@@ -850,7 +851,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        {
                                int operatorIndexInChain = 1;
                                OperatorSubtaskState opState =
-                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
                                OperatorStateHandle expectedManagedOpState = 
generatePartitionableStateHandle(
                                        id1.f0, i, 2, 8, false);
@@ -871,7 +872,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        {
                                int operatorIndexInChain = 0;
                                OperatorSubtaskState opState =
-                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
                                OperatorStateHandle expectedManagedOpState = 
generatePartitionableStateHandle(
                                        id2.f0, i, 2, 8, false);
@@ -895,7 +896,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
 
                for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
 
-                       final List<OperatorID> operatorIds = 
newJobVertex2.getOperatorIDs();
+                       final List<OperatorIDPair> operatorIDs = 
newJobVertex2.getOperatorIDs();
 
                        JobManagerTaskRestore taskRestore = 
newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
                        Assert.assertEquals(2L, 
taskRestore.getRestoreCheckpointId());
@@ -905,7 +906,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        {
                                int operatorIndexInChain = 1;
                                OperatorSubtaskState opState =
-                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
                                List<Collection<OperatorStateHandle>> 
actualSubManagedOperatorState = new ArrayList<>(1);
                                
actualSubManagedOperatorState.add(opState.getManagedOperatorState());
@@ -921,7 +922,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        {
                                int operatorIndexInChain = 0;
                                OperatorSubtaskState opState =
-                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain));
+                                       
stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
                                
assertTrue(opState.getManagedOperatorState().isEmpty());
                                
assertTrue(opState.getRawOperatorState().isEmpty());
 
@@ -931,7 +932,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        KeyGroupsStateHandle originalKeyedStateRaw = 
generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true);
 
                        OperatorSubtaskState headOpState =
-                               
stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 
1));
+                               
stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 
1).getGeneratedOperatorID());
 
                        Collection<KeyedStateHandle> keyedStateBackend = 
headOpState.getManagedKeyedState();
                        Collection<KeyedStateHandle> keyGroupStateRaw = 
headOpState.getRawKeyedState();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ce662f6..d0df7e3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2363,7 +2363,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                        Map<OperatorID, OperatorSubtaskState> opStates = new 
HashMap<>();
 
-                       opStates.put(jobVertex1.getOperatorIDs().get(0), 
operatorSubtaskState);
+                       
opStates.put(jobVertex1.getOperatorIDs().get(0).getGeneratedOperatorID(), 
operatorSubtaskState);
 
                        TaskStateSnapshot taskStateSnapshot = new 
TaskStateSnapshot(opStates);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 95f7b58..7b1ff45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -65,7 +66,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -357,8 +357,11 @@ public class CheckpointCoordinatorTestingUtils {
                
when(executionJobVertex.getParallelism()).thenReturn(parallelism);
                
when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
                
when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true);
-               
when(executionJobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
-               
when(executionJobVertex.getUserDefinedOperatorIDs()).thenReturn(Arrays.asList(new
 OperatorID[jobVertexIDs.size()]));
+               List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
+               for (OperatorID operatorID : jobVertexIDs) {
+                       
operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID));
+               }
+               
when(executionJobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
 
                return executionJobVertex;
        }
@@ -455,7 +458,11 @@ public class CheckpointCoordinatorTestingUtils {
                when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
 
                ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
+               List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
+               for (OperatorID operatorID : jobVertexIDs) {
+                       
operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID));
+               }
+               when(jobVertex.getOperatorIDs()).thenReturn(operatorIDPairs);
 
                when(vertex.getJobVertex()).thenReturn(jobVertex);
 
@@ -560,8 +567,7 @@ public class CheckpointCoordinatorTestingUtils {
                when(vertex.getMaxParallelism()).thenReturn(vertices.length);
                when(vertex.getJobVertexId()).thenReturn(id);
                when(vertex.getTaskVertices()).thenReturn(vertices);
-               
when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
-               
when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+               
when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id))));
 
                for (ExecutionVertex v : vertices) {
                        when(v.getJobVertex()).thenReturn(vertex);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 6d12b14..fc93b2c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -104,7 +105,7 @@ public class CheckpointMetadataLoadingTest {
                ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
                when(vertex.getParallelism()).thenReturn(parallelism);
                when(vertex.getMaxParallelism()).thenReturn(parallelism);
-               
when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
+               
when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)));
 
                Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
                tasks.put(jobVertexID, vertex);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 958f3f1..bd7f2ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.OperatorIDPair;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -312,8 +313,7 @@ public class CheckpointStateRestoreTest {
                when(vertex.getMaxParallelism()).thenReturn(vertices.length);
                when(vertex.getJobVertexId()).thenReturn(id);
                when(vertex.getTaskVertices()).thenReturn(vertices);
-               
when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
-               
when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+               
when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id))));
 
                for (ExecutionVertex v : vertices) {
                        when(v.getJobVertex()).thenReturn(vertex);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index f172e51..5afcc73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.OperatorIDPair;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
 import 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
@@ -87,7 +88,7 @@ public class PendingCheckpointTest {
 
        static {
                ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               
when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new 
OperatorID()));
+               
when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(new
 OperatorID())));
 
                ExecutionVertex vertex = mock(ExecutionVertex.class);
                when(vertex.getMaxParallelism()).thenReturn(128);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 5d12738..7fed3cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -49,7 +50,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
-import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
 import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewKeyedStateHandle;
@@ -173,7 +173,7 @@ public class StateAssignmentOperationTest extends 
TestLogger {
                        
StateAssignmentOperation.reDistributePartitionableStates(
                                Collections.singletonList(operatorState),
                                newParallelism,
-                               Collections.singletonList(operatorID),
+                               
Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)),
                                OperatorSubtaskState::getManagedOperatorState,
                                RoundRobinOperatorStateRepartitioner.INSTANCE
                        );
@@ -324,6 +324,21 @@ public class StateAssignmentOperationTest extends 
TestLogger {
                }
        }
 
+       @Test
+       public void assigningStatesShouldWorkWithUserDefinedOperatorIdsAsWell() 
throws JobException, JobExecutionException {
+               int numSubTasks = 1;
+               OperatorID operatorId = new OperatorID();
+               OperatorID userDefinedOperatorId = new OperatorID();
+               Set<OperatorID> operatorIds = 
Collections.singleton(userDefinedOperatorId);
+
+               ExecutionJobVertex executionJobVertex = 
buildExecutionJobVertex(operatorId, userDefinedOperatorId, 1);
+               Map<OperatorID, OperatorState> states = 
buildOperatorStates(operatorIds, numSubTasks);
+
+               new StateAssignmentOperation(0, 
Collections.singleton(executionJobVertex), states, false).assignStates();
+
+               
Assert.assertEquals(states.get(userDefinedOperatorId).getState(0), 
getAssignedState(executionJobVertex, operatorId, 0));
+       }
+
        private Set<OperatorID> buildOperatorIds(int operators) {
                Set<OperatorID> set = new HashSet<>();
                for (int j = 0; j < operators; j++) {
@@ -349,21 +364,28 @@ public class StateAssignmentOperationTest extends 
TestLogger {
                }));
        }
 
-       private Map<OperatorID, ExecutionJobVertex> 
buildVertices(Set<OperatorID> operators, int parallelism) throws JobException, 
JobExecutionException {
+       private Map<OperatorID, ExecutionJobVertex> 
buildVertices(Set<OperatorID> operators, int parallelism) {
+               return operators.stream()
+                       .collect(Collectors.toMap(Function.identity(), 
operatorID -> {
+                               try {
+                                       return 
buildExecutionJobVertex(operatorID, parallelism);
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }));
+       }
+
+       private ExecutionJobVertex buildExecutionJobVertex(OperatorID 
operatorID, int parallelism) throws JobException, JobExecutionException {
+               return buildExecutionJobVertex(operatorID, operatorID, 
parallelism);
+       }
+
+       private ExecutionJobVertex buildExecutionJobVertex(OperatorID 
operatorID, OperatorID userDefinedOperatorId, int parallelism) throws 
JobException, JobExecutionException {
                ExecutionGraph graph = 
TestingExecutionGraphBuilder.newBuilder().build();
-               return 
operators.stream().collect(Collectors.toMap(Function.identity(), operatorID -> {
-                       JobVertex jobVertex = new JobVertex(
-                               operatorID.toHexString(),
-                               new JobVertexID(),
-                               emptyList(),
-                               singletonList(operatorID),
-                               singletonList(operatorID));
-                       try {
-                               return new ExecutionJobVertex(graph, jobVertex, 
parallelism, 1, Time.seconds(1), 1L, 1L);
-                       } catch (Exception e) {
-                               throw new RuntimeException(e);
-                       }
-               }));
+               JobVertex jobVertex = new JobVertex(
+                       operatorID.toHexString(),
+                       new JobVertexID(),
+                       singletonList(OperatorIDPair.of(operatorID, 
userDefinedOperatorId)));
+               return new ExecutionJobVertex(graph, jobVertex, parallelism, 1, 
Time.seconds(1), 1L, 1L);
        }
 
        private OperatorSubtaskState getAssignedState(ExecutionJobVertex 
executionJobVertex, OperatorID operatorId, int subtaskIdx) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
deleted file mode 100644
index d70b524..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class LegacyJobVertexIdTest {
-
-       @Test
-       public void testIntroduceLegacyJobVertexIds() throws Exception {
-               JobVertexID defaultId = new JobVertexID();
-               JobVertexID legacyId1 = new JobVertexID();
-               JobVertexID legacyId2 = new JobVertexID();
-
-               JobVertex jobVertex = new JobVertex("test", defaultId, 
Arrays.asList(legacyId1, legacyId2), new ArrayList<>(), new ArrayList<>());
-               jobVertex.setInvokableClass(AbstractInvokable.class);
-
-               ExecutionGraph executionGraph = 
TestingExecutionGraphBuilder.newBuilder().build();
-
-               ExecutionJobVertex executionJobVertex =
-                               new ExecutionJobVertex(executionGraph, 
jobVertex, 1, Time.seconds(1));
-
-               Map<JobVertexID, ExecutionJobVertex> idToVertex = new 
HashMap<>();
-               idToVertex.put(executionJobVertex.getJobVertexId(), 
executionJobVertex);
-
-               Assert.assertEquals(executionJobVertex, 
idToVertex.get(defaultId));
-               Assert.assertNull(idToVertex.get(legacyId1));
-               Assert.assertNull(idToVertex.get(legacyId2));
-
-               idToVertex = 
ExecutionJobVertex.includeLegacyJobVertexIDs(idToVertex);
-
-               Assert.assertEquals(3, idToVertex.size());
-               Assert.assertEquals(executionJobVertex, 
idToVertex.get(defaultId));
-               Assert.assertEquals(executionJobVertex, 
idToVertex.get(legacyId1));
-               Assert.assertEquals(executionJobVertex, 
idToVertex.get(legacyId2));
-       }
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1065568..52d67f3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -408,21 +409,12 @@ public class StreamingJobGraphGenerator {
 
                JobVertexID jobVertexId = new JobVertexID(hash);
 
-               List<JobVertexID> legacyJobVertexIds = new 
ArrayList<>(legacyHashes.size());
-               for (Map<Integer, byte[]> legacyHash : legacyHashes) {
-                       hash = legacyHash.get(streamNodeId);
-                       if (null != hash) {
-                               legacyJobVertexIds.add(new JobVertexID(hash));
-                       }
-               }
-
                List<Tuple2<byte[], byte[]>> chainedOperators = 
chainedOperatorHashes.get(streamNodeId);
-               List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
-               List<OperatorID> userDefinedChainedOperatorVertexIds = new 
ArrayList<>();
+               List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
                if (chainedOperators != null) {
                        for (Tuple2<byte[], byte[]> chainedOperator : 
chainedOperators) {
-                               chainedOperatorVertexIds.add(new 
OperatorID(chainedOperator.f0));
-                               
userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new 
OperatorID(chainedOperator.f1) : null);
+                               OperatorID userDefinedOperatorID = 
chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1);
+                               operatorIDPairs.add(OperatorIDPair.of(new 
OperatorID(chainedOperator.f0), userDefinedOperatorID));
                        }
                }
 
@@ -430,9 +422,7 @@ public class StreamingJobGraphGenerator {
                        jobVertex = new InputOutputFormatVertex(
                                        chainedNames.get(streamNodeId),
                                        jobVertexId,
-                                       legacyJobVertexIds,
-                                       chainedOperatorVertexIds,
-                                       userDefinedChainedOperatorVertexIds);
+                                       operatorIDPairs);
 
                        chainedInputOutputFormats
                                .get(streamNodeId)
@@ -441,9 +431,7 @@ public class StreamingJobGraphGenerator {
                        jobVertex = new JobVertex(
                                        chainedNames.get(streamNodeId),
                                        jobVertexId,
-                                       legacyJobVertexIds,
-                                       chainedOperatorVertexIds,
-                                       userDefinedChainedOperatorVertexIds);
+                                       operatorIDPairs);
                }
 
                jobVertex.setResources(chainedMinResources.get(streamNodeId), 
chainedPreferredResources.get(streamNodeId));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index d00d0af..7cf7526 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -33,10 +33,8 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -423,27 +421,6 @@ public class StreamingJobGraphGeneratorNodeHashTest 
extends TestLogger {
        }
 
        @Test
-       public void testUserProvidedHashing() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
-
-               List<String> userHashes = 
Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
-
-               env.addSource(new NoOpSourceFunction(), 
"src").setUidHash(userHashes.get(0))
-                               .map(new NoOpMapFunction())
-                               .filter(new NoOpFilterFunction())
-                               .keyBy(new NoOpKeySelector())
-                               .reduce(new 
NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1));
-
-               StreamGraph streamGraph = env.getStreamGraph();
-               int idx = 1;
-               for (JobVertex jobVertex : 
streamGraph.getJobGraph().getVertices()) {
-                       List<JobVertexID> idAlternatives = 
jobVertex.getIdAlternatives();
-                       
Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), 
userHashes.get(idx));
-                       --idx;
-               }
-       }
-
-       @Test
        public void testUserProvidedHashingOnChainSupported() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
 

Reply via email to