This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0114338 [FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs 0114338 is described below commit 0114338da5ce52677d1dfa1ab4350b1567dc3522 Author: edu05 <eduardo.winpe...@gmail.com> AuthorDate: Wed Apr 29 19:36:08 2020 +0200 [FLINK-16638][runtime][checkpointing] Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs --- .../state/api/input/OperatorStateInputFormat.java | 3 +- .../state/api/runtime/OperatorIDGeneratorTest.java | 2 +- .../org/apache/flink/runtime/OperatorIDPair.java | 58 +++++++++++++++ .../flink/runtime/checkpoint/Checkpoints.java | 17 +---- .../runtime/checkpoint/PendingCheckpoint.java | 13 ++-- .../checkpoint/StateAssignmentOperation.java | 35 ++++----- .../runtime/executiongraph/ExecutionJobVertex.java | 87 ++-------------------- .../runtime/jobgraph/InputOutputFormatVertex.java | 7 +- .../apache/flink/runtime/jobgraph/JobVertex.java | 49 ++++-------- .../ChannelStateNoRescalingPartitionerTest.java | 3 +- .../CheckpointCoordinatorRestoringTest.java | 23 +++--- .../checkpoint/CheckpointCoordinatorTest.java | 2 +- .../CheckpointCoordinatorTestingUtils.java | 18 +++-- .../checkpoint/CheckpointMetadataLoadingTest.java | 3 +- .../checkpoint/CheckpointStateRestoreTest.java | 4 +- .../runtime/checkpoint/PendingCheckpointTest.java | 3 +- .../checkpoint/StateAssignmentOperationTest.java | 54 ++++++++++---- .../executiongraph/LegacyJobVertexIdTest.java | 64 ---------------- .../api/graph/StreamingJobGraphGenerator.java | 24 ++---- .../StreamingJobGraphGeneratorNodeHashTest.java | 23 ------ 20 files changed, 191 insertions(+), 301 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java index a312938..ec4c9b1 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/OperatorStateInputFormat.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner; @@ -123,7 +124,7 @@ abstract class OperatorStateInputFormat<OT> extends RichInputFormat<OT, Operator Map<OperatorInstanceID, List<OperatorStateHandle>> newManagedOperatorStates = reDistributePartitionableStates( singletonList(operatorState), minNumSplits, - singletonList(operatorState.getOperatorID()), + singletonList(OperatorIDPair.generatedIDOnly(operatorState.getOperatorID())), OperatorSubtaskState::getManagedOperatorState, RoundRobinOperatorStateRepartitioner.INSTANCE); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java index 9dff2a1..cb90641 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java @@ -65,6 +65,6 @@ public class OperatorIDGeneratorTest { .findFirst() .orElseThrow(() -> new IllegalStateException("Unable to find vertex")); - return vertex.getOperatorIDs().get(0); + return vertex.getOperatorIDs().get(0).getGeneratedOperatorID(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java new file mode 100644 index 0000000..e6070d1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/OperatorIDPair.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime; + +import org.apache.flink.runtime.jobgraph.OperatorID; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Optional; + +/** + * Formed of a mandatory operator ID and optionally a user defined operator ID. + */ +public class OperatorIDPair implements Serializable { + + private static final long serialVersionUID = 1L; + + private final OperatorID generatedOperatorID; + private final OperatorID userDefinedOperatorID; + + private OperatorIDPair(OperatorID generatedOperatorID, @Nullable OperatorID userDefinedOperatorID) { + this.generatedOperatorID = generatedOperatorID; + this.userDefinedOperatorID = userDefinedOperatorID; + } + + public static OperatorIDPair of(OperatorID generatedOperatorID, @Nullable OperatorID userDefinedOperatorID) { + return new OperatorIDPair(generatedOperatorID, userDefinedOperatorID); + } + + public static OperatorIDPair generatedIDOnly(OperatorID generatedOperatorID) { + return new OperatorIDPair(generatedOperatorID, null); + } + + public OperatorID getGeneratedOperatorID() { + return generatedOperatorID; + } + + public Optional<OperatorID> getUserDefinedOperatorID() { + return Optional.ofNullable(userDefinedOperatorID); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index 17db5e6..37f96f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer; import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers; @@ -137,28 +138,18 @@ public class Checkpoints { // generate mapping from operator to task Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>(); for (ExecutionJobVertex task : tasks.values()) { - for (OperatorID operatorID : task.getOperatorIDs()) { - operatorToJobVertexMapping.put(operatorID, task); + for (OperatorIDPair operatorIDPair : task.getOperatorIDs()) { + operatorToJobVertexMapping.put(operatorIDPair.getGeneratedOperatorID(), task); + operatorIDPair.getUserDefinedOperatorID().ifPresent(id -> operatorToJobVertexMapping.put(id, task)); } } // (2) validate it (parallelism, etc) - boolean expandedToLegacyIds = false; - HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(checkpointMetadata.getOperatorStates().size()); for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) { ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID()); - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping); - executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID()); - expandedToLegacyIds = true; - LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search."); - } - if (executionJobVertex != null) { if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 52844f8..6ccdc53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -371,31 +372,31 @@ public class PendingCheckpoint { acknowledgedTasks.add(executionAttemptId); } - List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs(); + List<OperatorIDPair> operatorIDs = vertex.getJobVertex().getOperatorIDs(); int subtaskIndex = vertex.getParallelSubtaskIndex(); long ackTimestamp = System.currentTimeMillis(); long stateSize = 0L; if (operatorSubtaskStates != null) { - for (OperatorID operatorID : operatorIDs) { + for (OperatorIDPair operatorID : operatorIDs) { OperatorSubtaskState operatorSubtaskState = - operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID); + operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID()); // if no real operatorSubtaskState was reported, we insert an empty state if (operatorSubtaskState == null) { operatorSubtaskState = new OperatorSubtaskState(); } - OperatorState operatorState = operatorStates.get(operatorID); + OperatorState operatorState = operatorStates.get(operatorID.getGeneratedOperatorID()); if (operatorState == null) { operatorState = new OperatorState( - operatorID, + operatorID.getGeneratedOperatorID(), vertex.getTotalNumberOfParallelSubtasks(), vertex.getMaxParallelism()); - operatorStates.put(operatorID, operatorState); + operatorStates.put(operatorID.getGeneratedOperatorID(), operatorState); } operatorState.putState(subtaskIndex, operatorSubtaskState); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 7f49828..9644430 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -86,14 +87,11 @@ public class StateAssignmentOperation { for (ExecutionJobVertex executionJobVertex : this.tasks) { // find the states of all operators belonging to this task - List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs(); - List<OperatorID> altOperatorIDs = executionJobVertex.getUserDefinedOperatorIDs(); - List<OperatorState> operatorStates = new ArrayList<>(operatorIDs.size()); + List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs(); + List<OperatorState> operatorStates = new ArrayList<>(operatorIDPairs.size()); boolean statelessTask = true; - for (int x = 0; x < operatorIDs.size(); x++) { - OperatorID operatorID = altOperatorIDs.get(x) == null - ? operatorIDs.get(x) - : altOperatorIDs.get(x); + for (OperatorIDPair operatorIDPair : operatorIDPairs) { + OperatorID operatorID = operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID()); OperatorState operatorState = localOperators.remove(operatorID); if (operatorState == null) { @@ -115,7 +113,7 @@ public class StateAssignmentOperation { private void assignAttemptState(ExecutionJobVertex executionJobVertex, List<OperatorState> operatorStates) { - List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs(); + List<OperatorIDPair> operatorIDs = executionJobVertex.getOperatorIDs(); //1. first compute the new parallelism checkParallelismPreconditions(operatorStates, executionJobVertex); @@ -214,7 +212,7 @@ public class StateAssignmentOperation { Map<OperatorInstanceID, List<KeyedStateHandle>> subRawKeyedState, int newParallelism) { - List<OperatorID> operatorIDs = executionJobVertex.getOperatorIDs(); + List<OperatorIDPair> operatorIDs = executionJobVertex.getOperatorIDs(); for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) { @@ -224,8 +222,8 @@ public class StateAssignmentOperation { TaskStateSnapshot taskState = new TaskStateSnapshot(operatorIDs.size()); boolean statelessTask = true; - for (OperatorID operatorID : operatorIDs) { - OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, operatorID); + for (OperatorIDPair operatorID : operatorIDs) { + OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, operatorID.getGeneratedOperatorID()); OperatorSubtaskState operatorSubtaskState = operatorSubtaskStateFrom( instanceID, @@ -239,7 +237,7 @@ public class StateAssignmentOperation { if (operatorSubtaskState.hasState()) { statelessTask = false; } - taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); + taskState.putSubtaskStateByOperatorID(operatorID.getGeneratedOperatorID(), operatorSubtaskState); } if (!statelessTask) { @@ -288,7 +286,7 @@ public class StateAssignmentOperation { private void reDistributeKeyedStates( List<OperatorState> oldOperatorStates, int newParallelism, - List<OperatorID> newOperatorIDs, + List<OperatorIDPair> newOperatorIDs, List<KeyGroupRange> newKeyGroupPartitions, Map<OperatorInstanceID, List<KeyedStateHandle>> newManagedKeyedState, Map<OperatorInstanceID, List<KeyedStateHandle>> newRawKeyedState) { @@ -300,7 +298,7 @@ public class StateAssignmentOperation { OperatorState operatorState = oldOperatorStates.get(operatorIndex); int oldParallelism = operatorState.getParallelism(); for (int subTaskIndex = 0; subTaskIndex < newParallelism; subTaskIndex++) { - OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex)); + OperatorInstanceID instanceID = OperatorInstanceID.of(subTaskIndex, newOperatorIDs.get(operatorIndex).getGeneratedOperatorID()); Tuple2<List<KeyedStateHandle>, List<KeyedStateHandle>> subKeyedStates = reAssignSubKeyedStates( operatorState, newKeyGroupPartitions, @@ -347,7 +345,7 @@ public class StateAssignmentOperation { public static <T extends StateObject> Map<OperatorInstanceID, List<T>> reDistributePartitionableStates( List<OperatorState> oldOperatorStates, int newParallelism, - List<OperatorID> newOperatorIDs, + List<OperatorIDPair> newOperatorIDs, Function<OperatorSubtaskState, StateObjectCollection<T>> extractHandle, OperatorStateRepartitioner<T> stateRepartitioner) { @@ -361,7 +359,7 @@ public class StateAssignmentOperation { Map<OperatorInstanceID, List<T>> result = new HashMap<>(); for (int operatorIndex = 0; operatorIndex < newOperatorIDs.size(); operatorIndex++) { result.putAll(applyRepartitioner( - newOperatorIDs.get(operatorIndex), + newOperatorIDs.get(operatorIndex).getGeneratedOperatorID(), stateRepartitioner, oldStates.get(operatorIndex), oldOperatorStates.get(operatorIndex).getParallelism(), @@ -555,7 +553,10 @@ public class StateAssignmentOperation { Set<OperatorID> allOperatorIDs = new HashSet<>(); for (ExecutionJobVertex executionJobVertex : tasks) { - allOperatorIDs.addAll(executionJobVertex.getOperatorIDs()); + for (OperatorIDPair operatorIDPair : executionJobVertex.getOperatorIDs()) { + allOperatorIDs.add(operatorIDPair.getGeneratedOperatorID()); + operatorIDPair.getUserDefinedOperatorID().ifPresent(allOperatorIDs::add); + } } for (Map.Entry<OperatorID, OperatorState> operatorGroupStateEntry : operatorStates.entrySet()) { OperatorState operatorState = operatorGroupStateEntry.getValue(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 9cca608..ee2cd2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -32,6 +32,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobKey; @@ -91,24 +92,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable private final JobVertex jobVertex; - /** - * The IDs of all operators contained in this execution job vertex. - * - * <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A]. - * A - B - D - * \ \ - * C E - * This is the same order that operators are stored in the {@code StreamTask}. - */ - private final List<OperatorID> operatorIDs; - - /** - * The alternative IDs of all operators contained in this execution job vertex. - * - * <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}. - */ - private final List<OperatorID> userDefinedOperatorIds; - private final ExecutionVertex[] taskVertices; private final IntermediateResult[] producedDataSets; @@ -200,8 +183,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO); this.taskVertices = new ExecutionVertex[numTaskVertices]; - this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs()); - this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs()); this.inputs = new ArrayList<>(jobVertex.getInputs().size()); @@ -289,21 +270,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable } /** - * Returns a list containing the IDs of all operators contained in this execution job vertex. + * Returns a list containing the ID pairs of all operators contained in this execution job vertex. * - * @return list containing the IDs of all contained operators + * @return list containing the ID pairs of all contained operators */ - public List<OperatorID> getOperatorIDs() { - return operatorIDs; - } - - /** - * Returns a list containing the alternative IDs of all operators contained in this execution job vertex. - * - * @return list containing alternative the IDs of all contained operators - */ - public List<OperatorID> getUserDefinedOperatorIDs() { - return userDefinedOperatorIds; + public List<OperatorIDPair> getOperatorIDs() { + return jobVertex.getOperatorIDs(); } public void setMaxParallelism(int maxParallelismDerived) { @@ -626,53 +598,4 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable return ExecutionState.CREATED; } } - - public static Map<JobVertexID, ExecutionJobVertex> includeLegacyJobVertexIDs( - Map<JobVertexID, ExecutionJobVertex> tasks) { - - Map<JobVertexID, ExecutionJobVertex> expanded = new HashMap<>(2 * tasks.size()); - // first include all new ids - expanded.putAll(tasks); - - // now expand and add legacy ids - for (ExecutionJobVertex executionJobVertex : tasks.values()) { - if (null != executionJobVertex) { - JobVertex jobVertex = executionJobVertex.getJobVertex(); - if (null != jobVertex) { - List<JobVertexID> alternativeIds = jobVertex.getIdAlternatives(); - for (JobVertexID jobVertexID : alternativeIds) { - ExecutionJobVertex old = expanded.put(jobVertexID, executionJobVertex); - Preconditions.checkState(null == old || old.equals(executionJobVertex), - "Ambiguous jobvertex id detected during expansion to legacy ids."); - } - } - } - } - - return expanded; - } - - public static Map<OperatorID, ExecutionJobVertex> includeAlternativeOperatorIDs( - Map<OperatorID, ExecutionJobVertex> operatorMapping) { - - Map<OperatorID, ExecutionJobVertex> expanded = new HashMap<>(2 * operatorMapping.size()); - // first include all existing ids - expanded.putAll(operatorMapping); - - // now expand and add user-defined ids - for (ExecutionJobVertex executionJobVertex : operatorMapping.values()) { - if (executionJobVertex != null) { - JobVertex jobVertex = executionJobVertex.getJobVertex(); - if (jobVertex != null) { - for (OperatorID operatorID : jobVertex.getUserDefinedOperatorIDs()) { - if (operatorID != null) { - expanded.put(operatorID, executionJobVertex); - } - } - } - } - } - - return expanded; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java index 741df52..f1e7a2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputOutputFormatVertex.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.InitializeOnMaster; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.operators.util.TaskConfig; import java.util.HashMap; @@ -48,11 +49,9 @@ public class InputOutputFormatVertex extends JobVertex { public InputOutputFormatVertex( String name, JobVertexID id, - List<JobVertexID> alternativeIds, - List<OperatorID> operatorIds, - List<OperatorID> alternativeOperatorIds) { + List<OperatorIDPair> operatorIDPairs) { - super(name, id, alternativeIds, operatorIds, alternativeOperatorIds); + super(name, id, operatorIDPairs); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index e9616a7..c77103d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitSource; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; @@ -55,14 +56,15 @@ public class JobVertex implements java.io.Serializable { /** The ID of the vertex. */ private final JobVertexID id; - /** The alternative IDs of the vertex. */ - private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>(); - - /** The IDs of all operators contained in this vertex. */ - private final ArrayList<OperatorID> operatorIDs = new ArrayList<>(); - - /** The alternative IDs of all operators contained in this vertex. */ - private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>(); + /** The IDs of all operators contained in this vertex. + * + * <p>The ID pairs are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A]. + * A - B - D + * \ \ + * C E + * This is the same order that operators are stored in the {@code StreamTask}. + */ + private final List<OperatorIDPair> operatorIDs; /** List of produced data sets, one per writer. */ private final ArrayList<IntermediateDataSet> results = new ArrayList<>(); @@ -143,9 +145,8 @@ public class JobVertex implements java.io.Serializable { public JobVertex(String name, JobVertexID id) { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; - // the id lists must have the same size - this.operatorIDs.add(OperatorID.fromJobVertexID(this.id)); - this.operatorIdsAlternatives.add(null); + OperatorIDPair operatorIDPair = OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(this.id)); + this.operatorIDs = Collections.singletonList(operatorIDPair); } /** @@ -153,17 +154,12 @@ public class JobVertex implements java.io.Serializable { * * @param name The name of the new job vertex. * @param primaryId The id of the job vertex. - * @param alternativeIds The alternative ids of the job vertex. - * @param operatorIds The ids of all operators contained in this job vertex. - * @param alternativeOperatorIds The alternative ids of all operators contained in this job vertex- + * @param operatorIDPairs The operator ID pairs of the job vertex. */ - public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) { - Preconditions.checkArgument(operatorIds.size() == alternativeOperatorIds.size()); + public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair> operatorIDPairs) { this.name = name == null ? DEFAULT_NAME : name; this.id = primaryId == null ? new JobVertexID() : primaryId; - this.idAlternatives.addAll(alternativeIds); - this.operatorIDs.addAll(operatorIds); - this.operatorIdsAlternatives.addAll(alternativeOperatorIds); + this.operatorIDs = Collections.unmodifiableList(operatorIDPairs); } // -------------------------------------------------------------------------------------------- @@ -178,15 +174,6 @@ public class JobVertex implements java.io.Serializable { } /** - * Returns a list of all alternative IDs of this job vertex. - * - * @return List of all alternative IDs for this job vertex - */ - public List<JobVertexID> getIdAlternatives() { - return idAlternatives; - } - - /** * Returns the name of the vertex. * * @return The name of the vertex. @@ -222,14 +209,10 @@ public class JobVertex implements java.io.Serializable { return this.inputs.size(); } - public List<OperatorID> getOperatorIDs() { + public List<OperatorIDPair> getOperatorIDs() { return operatorIDs; } - public List<OperatorID> getUserDefinedOperatorIDs() { - return operatorIdsAlternatives; - } - /** * Returns the vertex's configuration object which can be used to pass custom settings to the task at runtime. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java index a7763cb..91ae0ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ChannelStateNoRescalingPartitionerTest.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -101,7 +102,7 @@ public class ChannelStateNoRescalingPartitionerTest { StateAssignmentOperation.reDistributePartitionableStates( singletonList(state), newParallelism, - singletonList(OPERATOR_ID), + singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID)), (Function<OperatorSubtaskState, StateObjectCollection<T>>) this.extractState, channelStateNonRescalingRepartitioner("test")); } catch (IllegalArgumentException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index d5acd88..0f03875 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorConfigurationBuilder; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; @@ -506,7 +507,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { List<List<Collection<OperatorStateHandle>>> actualOpStatesRaw = new ArrayList<>(newJobVertex2.getParallelism()); for (int i = 0; i < newJobVertex2.getParallelism(); i++) { - List<OperatorID> operatorIDs = newJobVertex2.getOperatorIDs(); + List<OperatorIDPair> operatorIDs = newJobVertex2.getOperatorIDs(); KeyGroupsStateHandle originalKeyedStateBackend = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), false); KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i), true); @@ -520,7 +521,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { List<Collection<OperatorStateHandle>> allParallelRawOpStates = new ArrayList<>(operatorIDs.size()); for (int idx = 0; idx < operatorIDs.size(); ++idx) { - OperatorID operatorID = operatorIDs.get(idx); + OperatorID operatorID = operatorIDs.get(idx).getGeneratedOperatorID(); OperatorSubtaskState opState = taskStateHandles.getSubtaskStateByOperatorID(operatorID); Collection<OperatorStateHandle> opStateBackend = opState.getManagedOperatorState(); Collection<OperatorStateHandle> opStateRaw = opState.getRawOperatorState(); @@ -827,13 +828,13 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { for (int i = 0; i < newJobVertex1.getParallelism(); i++) { - final List<OperatorID> operatorIds = newJobVertex1.getOperatorIDs(); + final List<OperatorIDPair> operatorIDs = newJobVertex1.getOperatorIDs(); JobManagerTaskRestore taskRestore = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore(); Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId()); TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot(); - OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 1)); + OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID()); assertTrue(headOpState.getManagedKeyedState().isEmpty()); assertTrue(headOpState.getRawKeyedState().isEmpty()); @@ -841,7 +842,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { { int operatorIndexInChain = 2; OperatorSubtaskState opState = - stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); + stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID()); assertTrue(opState.getManagedOperatorState().isEmpty()); assertTrue(opState.getRawOperatorState().isEmpty()); @@ -850,7 +851,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { { int operatorIndexInChain = 1; OperatorSubtaskState opState = - stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); + stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID()); OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle( id1.f0, i, 2, 8, false); @@ -871,7 +872,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { { int operatorIndexInChain = 0; OperatorSubtaskState opState = - stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); + stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID()); OperatorStateHandle expectedManagedOpState = generatePartitionableStateHandle( id2.f0, i, 2, 8, false); @@ -895,7 +896,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { for (int i = 0; i < newJobVertex2.getParallelism(); i++) { - final List<OperatorID> operatorIds = newJobVertex2.getOperatorIDs(); + final List<OperatorIDPair> operatorIDs = newJobVertex2.getOperatorIDs(); JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore(); Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId()); @@ -905,7 +906,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { { int operatorIndexInChain = 1; OperatorSubtaskState opState = - stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); + stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID()); List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = new ArrayList<>(1); actualSubManagedOperatorState.add(opState.getManagedOperatorState()); @@ -921,7 +922,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { { int operatorIndexInChain = 0; OperatorSubtaskState opState = - stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIndexInChain)); + stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID()); assertTrue(opState.getManagedOperatorState().isEmpty()); assertTrue(opState.getRawOperatorState().isEmpty()); @@ -931,7 +932,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true); OperatorSubtaskState headOpState = - stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size() - 1)); + stateSnapshot.getSubtaskStateByOperatorID(operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID()); Collection<KeyedStateHandle> keyedStateBackend = headOpState.getManagedKeyedState(); Collection<KeyedStateHandle> keyGroupStateRaw = headOpState.getRawKeyedState(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index ce662f6..d0df7e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2363,7 +2363,7 @@ public class CheckpointCoordinatorTest extends TestLogger { Map<OperatorID, OperatorSubtaskState> opStates = new HashMap<>(); - opStates.put(jobVertex1.getOperatorIDs().get(0), operatorSubtaskState); + opStates.put(jobVertex1.getOperatorIDs().get(0).getGeneratedOperatorID(), operatorSubtaskState); TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 95f7b58..7b1ff45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -65,7 +66,6 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -357,8 +357,11 @@ public class CheckpointCoordinatorTestingUtils { when(executionJobVertex.getParallelism()).thenReturn(parallelism); when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism); when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true); - when(executionJobVertex.getOperatorIDs()).thenReturn(jobVertexIDs); - when(executionJobVertex.getUserDefinedOperatorIDs()).thenReturn(Arrays.asList(new OperatorID[jobVertexIDs.size()])); + List<OperatorIDPair> operatorIDPairs = new ArrayList<>(); + for (OperatorID operatorID : jobVertexIDs) { + operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID)); + } + when(executionJobVertex.getOperatorIDs()).thenReturn(operatorIDPairs); return executionJobVertex; } @@ -455,7 +458,11 @@ public class CheckpointCoordinatorTestingUtils { when(vertex.getMaxParallelism()).thenReturn(maxParallelism); ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); - when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs); + List<OperatorIDPair> operatorIDPairs = new ArrayList<>(); + for (OperatorID operatorID : jobVertexIDs) { + operatorIDPairs.add(OperatorIDPair.generatedIDOnly(operatorID)); + } + when(jobVertex.getOperatorIDs()).thenReturn(operatorIDPairs); when(vertex.getJobVertex()).thenReturn(jobVertex); @@ -560,8 +567,7 @@ public class CheckpointCoordinatorTestingUtils { when(vertex.getMaxParallelism()).thenReturn(vertices.length); when(vertex.getJobVertexId()).thenReturn(id); when(vertex.getTaskVertices()).thenReturn(vertices); - when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id))); - when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null)); + when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id)))); for (ExecutionVertex v : vertices) { when(v.getJobVertex()).thenReturn(vertex); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java index 6d12b14..fc93b2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -104,7 +105,7 @@ public class CheckpointMetadataLoadingTest { ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); when(vertex.getParallelism()).thenReturn(parallelism); when(vertex.getMaxParallelism()).thenReturn(parallelism); - when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID)); + when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID))); Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); tasks.put(jobVertexID, vertex); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 958f3f1..bd7f2ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; @@ -312,8 +313,7 @@ public class CheckpointStateRestoreTest { when(vertex.getMaxParallelism()).thenReturn(vertices.length); when(vertex.getJobVertexId()).thenReturn(id); when(vertex.getTaskVertices()).thenReturn(vertices); - when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id))); - when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null)); + when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(id)))); for (ExecutionVertex v : vertices) { when(v.getJobVertex()).thenReturn(vertex); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index f172e51..5afcc73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer; import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; @@ -87,7 +88,7 @@ public class PendingCheckpointTest { static { ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); - when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID())); + when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(new OperatorID()))); ExecutionVertex vertex = mock(ExecutionVertex.class); when(vertex.getMaxParallelism()).thenReturn(128); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java index 5d12738..7fed3cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -49,7 +50,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle; import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewKeyedStateHandle; @@ -173,7 +173,7 @@ public class StateAssignmentOperationTest extends TestLogger { StateAssignmentOperation.reDistributePartitionableStates( Collections.singletonList(operatorState), newParallelism, - Collections.singletonList(operatorID), + Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)), OperatorSubtaskState::getManagedOperatorState, RoundRobinOperatorStateRepartitioner.INSTANCE ); @@ -324,6 +324,21 @@ public class StateAssignmentOperationTest extends TestLogger { } } + @Test + public void assigningStatesShouldWorkWithUserDefinedOperatorIdsAsWell() throws JobException, JobExecutionException { + int numSubTasks = 1; + OperatorID operatorId = new OperatorID(); + OperatorID userDefinedOperatorId = new OperatorID(); + Set<OperatorID> operatorIds = Collections.singleton(userDefinedOperatorId); + + ExecutionJobVertex executionJobVertex = buildExecutionJobVertex(operatorId, userDefinedOperatorId, 1); + Map<OperatorID, OperatorState> states = buildOperatorStates(operatorIds, numSubTasks); + + new StateAssignmentOperation(0, Collections.singleton(executionJobVertex), states, false).assignStates(); + + Assert.assertEquals(states.get(userDefinedOperatorId).getState(0), getAssignedState(executionJobVertex, operatorId, 0)); + } + private Set<OperatorID> buildOperatorIds(int operators) { Set<OperatorID> set = new HashSet<>(); for (int j = 0; j < operators; j++) { @@ -349,21 +364,28 @@ public class StateAssignmentOperationTest extends TestLogger { })); } - private Map<OperatorID, ExecutionJobVertex> buildVertices(Set<OperatorID> operators, int parallelism) throws JobException, JobExecutionException { + private Map<OperatorID, ExecutionJobVertex> buildVertices(Set<OperatorID> operators, int parallelism) { + return operators.stream() + .collect(Collectors.toMap(Function.identity(), operatorID -> { + try { + return buildExecutionJobVertex(operatorID, parallelism); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + private ExecutionJobVertex buildExecutionJobVertex(OperatorID operatorID, int parallelism) throws JobException, JobExecutionException { + return buildExecutionJobVertex(operatorID, operatorID, parallelism); + } + + private ExecutionJobVertex buildExecutionJobVertex(OperatorID operatorID, OperatorID userDefinedOperatorId, int parallelism) throws JobException, JobExecutionException { ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder().build(); - return operators.stream().collect(Collectors.toMap(Function.identity(), operatorID -> { - JobVertex jobVertex = new JobVertex( - operatorID.toHexString(), - new JobVertexID(), - emptyList(), - singletonList(operatorID), - singletonList(operatorID)); - try { - return new ExecutionJobVertex(graph, jobVertex, parallelism, 1, Time.seconds(1), 1L, 1L); - } catch (Exception e) { - throw new RuntimeException(e); - } - })); + JobVertex jobVertex = new JobVertex( + operatorID.toHexString(), + new JobVertexID(), + singletonList(OperatorIDPair.of(operatorID, userDefinedOperatorId))); + return new ExecutionJobVertex(graph, jobVertex, parallelism, 1, Time.seconds(1), 1L, 1L); } private OperatorSubtaskState getAssignedState(ExecutionJobVertex executionJobVertex, OperatorID operatorId, int subtaskIdx) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java deleted file mode 100644 index d70b524..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -public class LegacyJobVertexIdTest { - - @Test - public void testIntroduceLegacyJobVertexIds() throws Exception { - JobVertexID defaultId = new JobVertexID(); - JobVertexID legacyId1 = new JobVertexID(); - JobVertexID legacyId2 = new JobVertexID(); - - JobVertex jobVertex = new JobVertex("test", defaultId, Arrays.asList(legacyId1, legacyId2), new ArrayList<>(), new ArrayList<>()); - jobVertex.setInvokableClass(AbstractInvokable.class); - - ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder().build(); - - ExecutionJobVertex executionJobVertex = - new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1)); - - Map<JobVertexID, ExecutionJobVertex> idToVertex = new HashMap<>(); - idToVertex.put(executionJobVertex.getJobVertexId(), executionJobVertex); - - Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId)); - Assert.assertNull(idToVertex.get(legacyId1)); - Assert.assertNull(idToVertex.get(legacyId2)); - - idToVertex = ExecutionJobVertex.includeLegacyJobVertexIDs(idToVertex); - - Assert.assertEquals(3, idToVertex.size()); - Assert.assertEquals(executionJobVertex, idToVertex.get(defaultId)); - Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1)); - Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2)); - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 1065568..52d67f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -408,21 +409,12 @@ public class StreamingJobGraphGenerator { JobVertexID jobVertexId = new JobVertexID(hash); - List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size()); - for (Map<Integer, byte[]> legacyHash : legacyHashes) { - hash = legacyHash.get(streamNodeId); - if (null != hash) { - legacyJobVertexIds.add(new JobVertexID(hash)); - } - } - List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId); - List<OperatorID> chainedOperatorVertexIds = new ArrayList<>(); - List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>(); + List<OperatorIDPair> operatorIDPairs = new ArrayList<>(); if (chainedOperators != null) { for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) { - chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0)); - userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null); + OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1); + operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperator.f0), userDefinedOperatorID)); } } @@ -430,9 +422,7 @@ public class StreamingJobGraphGenerator { jobVertex = new InputOutputFormatVertex( chainedNames.get(streamNodeId), jobVertexId, - legacyJobVertexIds, - chainedOperatorVertexIds, - userDefinedChainedOperatorVertexIds); + operatorIDPairs); chainedInputOutputFormats .get(streamNodeId) @@ -441,9 +431,7 @@ public class StreamingJobGraphGenerator { jobVertex = new JobVertex( chainedNames.get(streamNodeId), jobVertexId, - legacyJobVertexIds, - chainedOperatorVertexIds, - userDefinedChainedOperatorVertexIds); + operatorIDPairs); } jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java index d00d0af..7cf7526 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java @@ -33,10 +33,8 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -423,27 +421,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { } @Test - public void testUserProvidedHashing() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); - - List<String> userHashes = Arrays.asList("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); - - env.addSource(new NoOpSourceFunction(), "src").setUidHash(userHashes.get(0)) - .map(new NoOpMapFunction()) - .filter(new NoOpFilterFunction()) - .keyBy(new NoOpKeySelector()) - .reduce(new NoOpReduceFunction()).name("reduce").setUidHash(userHashes.get(1)); - - StreamGraph streamGraph = env.getStreamGraph(); - int idx = 1; - for (JobVertex jobVertex : streamGraph.getJobGraph().getVertices()) { - List<JobVertexID> idAlternatives = jobVertex.getIdAlternatives(); - Assert.assertEquals(idAlternatives.get(idAlternatives.size() - 1).toString(), userHashes.get(idx)); - --idx; - } - } - - @Test public void testUserProvidedHashingOnChainSupported() { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();