This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4597d55 [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode 4597d55 is described below commit 4597d5557c640e0ef5a526cbb6d46686be5dd813 Author: Austin Cawley-Edwards <austin.caw...@gmail.com> AuthorDate: Wed Mar 31 04:38:24 2021 -0400 [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode This moves the configuration and management of vertex parallelism into the control of the scheduler, instead of the ExecutionGraphVertex. This gives the Adaptive scheduler assurances about the task resources when scheduling. --- .../flink/runtime/checkpoint/Checkpoints.java | 3 +- .../checkpoint/StateAssignmentOperation.java | 9 +- .../dispatcher/DefaultJobManagerRunnerFactory.java | 2 - .../executiongraph/DefaultExecutionGraph.java | 15 +- .../DefaultExecutionGraphBuilder.java | 7 +- .../runtime/executiongraph/ExecutionJobVertex.java | 86 +++------ .../MutableVertexAttemptNumberStore.java | 15 +- .../scheduler/DefaultExecutionGraphFactory.java | 4 +- .../scheduler/DefaultVertexParallelismInfo.java | 112 +++++++++++ .../scheduler/DefaultVertexParallelismStore.java | 47 +++++ .../runtime/scheduler/ExecutionGraphFactory.java | 3 + .../scheduler/MutableVertexParallelismStore.java | 32 ++++ .../flink/runtime/scheduler/SchedulerBase.java | 59 ++++++ .../scheduler/VertexParallelismInformation.java | 55 ++++++ .../runtime/scheduler/VertexParallelismStore.java | 36 ++++ .../scheduler/adaptive/AdaptiveScheduler.java | 137 +++++++++++++- .../scheduler/adaptive/JobGraphJobInformation.java | 27 ++- .../scheduler/adaptive/ReactiveModeUtils.java | 51 ----- .../checkpoint/CheckpointMetadataLoadingTest.java | 3 +- .../DefaultExecutionGraphConstructionTest.java | 97 +++++----- .../executiongraph/ExecutionGraphTestUtils.java | 5 + .../executiongraph/ExecutionJobVertexTest.java | 109 ++--------- .../executiongraph/PointwisePatternTest.java | 11 +- .../TestingDefaultExecutionGraphBuilder.java | 14 +- .../executiongraph/VertexSlotSharingTest.java | 7 +- .../DefaultExecutionGraphFactoryTest.java | 2 + .../DefaultVertexParallelismInfoTest.java | 131 +++++++++++++ .../DefaultVertexParallelismStoreTest.java | 73 +++++++ .../SchedulerBaseComputeVertexParallelismTest.java | 94 +++++++++ ...erComputeReactiveModeVertexParallelismTest.java | 124 ++++++++++++ .../scheduler/adaptive/AdaptiveSchedulerTest.java | 209 +++++++++++++++------ .../runtime/scheduler/adaptive/ExecutingTest.java | 3 + .../topology/BuildExecutionGraphBenchmark.java | 9 +- .../partitioner/RescalePartitionerTest.java | 7 +- .../flink/test/checkpointing/RescalingITCase.java | 4 +- 35 files changed, 1243 insertions(+), 359 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index fc792ca..45ff0d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -162,7 +162,8 @@ public class Checkpoints { if (executionJobVertex != null) { if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism() - || !executionJobVertex.isMaxParallelismConfigured()) { + || executionJobVertex.canRescaleMaxParallelism( + operatorState.getMaxParallelism())) { operatorStates.put(operatorState.getOperatorID(), operatorState); } else { String msg = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 3381db3..42d7edf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -650,19 +650,16 @@ public class StateAssignmentOperation { // satisfy the restored state if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { - if (!executionJobVertex.isMaxParallelismConfigured()) { - // if the max parallelism was not explicitly specified by the user, we derive it - // from the state - + if (executionJobVertex.canRescaleMaxParallelism(operatorState.getMaxParallelism())) { LOG.debug( - "Overriding maximum parallelism for JobVertex {} from {} to {}", + "Rescaling maximum parallelism for JobVertex {} from {} to {}", executionJobVertex.getJobVertexId(), executionJobVertex.getMaxParallelism(), operatorState.getMaxParallelism()); executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism()); } else { - // if the max parallelism was explicitly specified, we complain on mismatch + // if the max parallelism cannot be rescaled, we complain on mismatch throw new IllegalStateException( "The maximum parallelism (" + operatorState.getMaxParallelism() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java index e8543d4..10726d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.scheduler.adaptive.ReactiveModeUtils; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleServiceLoader; import org.apache.flink.util.Preconditions; @@ -70,7 +69,6 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { slotPoolServiceSchedulerFactory.getSchedulerType() == JobManagerOptions.SchedulerType.Adaptive, "Adaptive Scheduler is required for reactive mode"); - ReactiveModeUtils.configureJobGraphForReactiveMode(jobGraph); } final ShuffleMaster<?> shuffleMaster = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index a96bd4c..222d9c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -64,6 +64,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.scheduler.InternalFailuresListener; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; @@ -226,6 +228,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final VertexAttemptNumberStore initialAttemptCounts; + private final VertexParallelismStore parallelismStore; + // ------ Fields that are relevant to the execution and need to be cleared before archiving // ------- @@ -279,7 +283,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG ExecutionDeploymentListener executionDeploymentListener, ExecutionStateUpdateListener executionStateUpdateListener, long initializationTimestamp, - VertexAttemptNumberStore initialAttemptCounts) + VertexAttemptNumberStore initialAttemptCounts, + VertexParallelismStore vertexParallelismStore) throws IOException { this.jobInformation = checkNotNull(jobInformation); @@ -336,6 +341,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG this.initialAttemptCounts = initialAttemptCounts; + this.parallelismStore = vertexParallelismStore; + this.edgeManager = new EdgeManager(); this.executionVerticesById = new HashMap<>(); this.resultPartitionsById = new HashMap<>(); @@ -770,6 +777,9 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG this.isStoppable = false; } + VertexParallelismInformation parallelismInfo = + parallelismStore.getParallelismInfo(jobVertex.getID()); + // create the execution job vertex and attach it to the graph ExecutionJobVertex ejv = new ExecutionJobVertex( @@ -778,7 +788,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG maxPriorAttemptsHistoryLength, rpcTimeout, createTimestamp, - this.initialAttemptCounts.getAttemptCounts(jobVertex.getID())); + parallelismInfo, + initialAttemptCounts.getAttemptCounts(jobVertex.getID())); ejv.connectToPredecessors(this.intermediateResults); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index 57dbcd3..be546c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLoader; @@ -91,7 +92,8 @@ public class DefaultExecutionGraphBuilder { ExecutionDeploymentListener executionDeploymentListener, ExecutionStateUpdateListener executionStateUpdateListener, long initializationTimestamp, - VertexAttemptNumberStore vertexAttemptNumberStore) + VertexAttemptNumberStore vertexAttemptNumberStore, + VertexParallelismStore vertexParallelismStore) throws JobExecutionException, JobException { checkNotNull(jobGraph, "job graph cannot be null"); @@ -134,7 +136,8 @@ public class DefaultExecutionGraphBuilder { executionDeploymentListener, executionStateUpdateListener, initializationTimestamp, - vertexAttemptNumberStore); + vertexAttemptNumberStore, + vertexParallelismStore); } catch (IOException e) { throw new JobException("Could not create the ExecutionGraph.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 66b32d3..050d7a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -46,11 +45,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.types.Either; import org.apache.flink.util.IOUtils; import org.apache.flink.util.OptionalFailure; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; @@ -85,8 +83,6 @@ public class ExecutionJobVertex /** Use the same log for all ExecutionGraph classes. */ private static final Logger LOG = DefaultExecutionGraph.LOG; - public static final int VALUE_NOT_SET = -1; - private final Object stateMonitor = new Object(); private final InternalExecutionGraphAccessor graph; @@ -99,7 +95,7 @@ public class ExecutionJobVertex private final List<IntermediateResult> inputs; - private final int parallelism; + private final VertexParallelismInformation parallelismInfo; private final SlotSharingGroup slotSharingGroup; @@ -107,10 +103,6 @@ public class ExecutionJobVertex private final InputSplit[] inputSplits; - private final boolean maxParallelismConfigured; - - private int maxParallelism; - private final ResourceProfile resourceProfile; /** @@ -132,6 +124,7 @@ public class ExecutionJobVertex int maxPriorAttemptsHistoryLength, Time timeout, long createTimestamp, + VertexParallelismInformation parallelismInfo, SubtaskAttemptNumberStore initialAttemptCounts) throws JobException { @@ -142,30 +135,22 @@ public class ExecutionJobVertex this.graph = graph; this.jobVertex = jobVertex; - this.parallelism = jobVertex.getParallelism() > 0 ? jobVertex.getParallelism() : 1; - - final int configuredMaxParallelism = jobVertex.getMaxParallelism(); - - this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism); - - // if no max parallelism was configured by the user, we calculate and set a default - setMaxParallelismInternal( - maxParallelismConfigured - ? configuredMaxParallelism - : KeyGroupRangeAssignment.computeDefaultMaxParallelism(this.parallelism)); + this.parallelismInfo = parallelismInfo; // verify that our parallelism is not higher than the maximum parallelism - if (this.parallelism > maxParallelism) { + if (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) { throw new JobException( String.format( "Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", - jobVertex.getName(), this.parallelism, maxParallelism)); + jobVertex.getName(), + this.parallelismInfo.getParallelism(), + this.parallelismInfo.getMaxParallelism())); } this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO); - this.taskVertices = new ExecutionVertex[this.parallelism]; + this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()]; this.inputs = new ArrayList<>(jobVertex.getInputs().size()); @@ -182,11 +167,14 @@ public class ExecutionJobVertex this.producedDataSets[i] = new IntermediateResult( - result.getId(), this, this.parallelism, result.getResultType()); + result.getId(), + this, + this.parallelismInfo.getParallelism(), + result.getResultType()); } // create all task vertices - for (int i = 0; i < this.parallelism; i++) { + for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) { ExecutionVertex vertex = new ExecutionVertex( this, @@ -203,7 +191,7 @@ public class ExecutionJobVertex // sanity check for the double referencing between intermediate result partitions and // execution vertices for (IntermediateResult ir : this.producedDataSets) { - if (ir.getNumberOfAssignedPartitions() != parallelism) { + if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) { throw new RuntimeException( "The intermediate result's partitions were not correctly assigned."); } @@ -242,7 +230,8 @@ public class ExecutionJobVertex ClassLoader oldContextClassLoader = currentThread.getContextClassLoader(); currentThread.setContextClassLoader(graph.getUserClassLoader()); try { - inputSplits = splitSource.createInputSplits(this.parallelism); + inputSplits = + splitSource.createInputSplits(this.parallelismInfo.getParallelism()); if (inputSplits != null) { splitAssigner = splitSource.getInputSplitAssigner(inputSplits); @@ -269,31 +258,8 @@ public class ExecutionJobVertex return jobVertex.getOperatorIDs(); } - public void setMaxParallelism(int maxParallelismDerived) { - - Preconditions.checkState( - !maxParallelismConfigured, - "Attempt to override a configured max parallelism. Configured: " - + this.maxParallelism - + ", argument: " - + maxParallelismDerived); - - setMaxParallelismInternal(maxParallelismDerived); - } - - private void setMaxParallelismInternal(int maxParallelism) { - if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { - maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; - } - - Preconditions.checkArgument( - maxParallelism > 0 - && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, - "Overriding max parallelism is not in valid bounds (1..%s), found: %s", - KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, - maxParallelism); - - this.maxParallelism = maxParallelism; + public void setMaxParallelism(int maxParallelism) { + parallelismInfo.setMaxParallelism(maxParallelism); } public InternalExecutionGraphAccessor getGraph() { @@ -311,12 +277,12 @@ public class ExecutionJobVertex @Override public int getParallelism() { - return parallelism; + return parallelismInfo.getParallelism(); } @Override public int getMaxParallelism() { - return maxParallelism; + return parallelismInfo.getMaxParallelism(); } @Override @@ -324,8 +290,8 @@ public class ExecutionJobVertex return resourceProfile; } - public boolean isMaxParallelismConfigured() { - return maxParallelismConfigured; + public boolean canRescaleMaxParallelism(int desiredMaxParallelism) { + return parallelismInfo.canRescaleMaxParallelism(desiredMaxParallelism); } public JobID getJobId() { @@ -379,8 +345,8 @@ public class ExecutionJobVertex new TaskInformation( jobVertex.getID(), jobVertex.getName(), - parallelism, - maxParallelism, + parallelismInfo.getParallelism(), + parallelismInfo.getMaxParallelism(), jobVertex.getInvokableClassName(), jobVertex.getConfiguration()); @@ -399,7 +365,7 @@ public class ExecutionJobVertex num[vertex.getExecutionState().ordinal()]++; } - return getAggregateJobVertexState(num, parallelism); + return getAggregateJobVertexState(num, this.parallelismInfo.getParallelism()); } // --------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java index 1234bc5..9af2ac2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index 3dbb80d..f1721c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -91,6 +91,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, long initializationTimestamp, VertexAttemptNumberStore vertexAttemptNumberStore, + VertexParallelismStore vertexParallelismStore, Logger log) throws Exception { ExecutionDeploymentListener executionDeploymentListener = @@ -122,7 +123,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory { executionDeploymentListener, executionStateUpdateListener, initializationTimestamp, - vertexAttemptNumberStore); + vertexAttemptNumberStore, + vertexParallelismStore); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java new file mode 100644 index 0000000..e76c254 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.Preconditions; + +import java.util.Optional; +import java.util.function.Function; + +/** A {@link VertexParallelismInformation} implementation that provides common validation. */ +public class DefaultVertexParallelismInfo implements VertexParallelismInformation { + private final int parallelism; + private int maxParallelism; + private final Function<Integer, Optional<String>> rescaleMaxValidator; + + /** + * Create {@link VertexParallelismInformation} with max parallelism rescaling validation for a + * vertex. + * + * @param parallelism the vertex's parallelism + * @param maxParallelism the vertex's max parallelism + * @param rescaleMaxValidator the validation function to provide an error message if a max + * parallelism rescale is not allowed + */ + public DefaultVertexParallelismInfo( + int parallelism, + int maxParallelism, + Function<Integer, Optional<String>> rescaleMaxValidator) { + this.parallelism = checkParallelism(parallelism); + this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism); + this.rescaleMaxValidator = Preconditions.checkNotNull(rescaleMaxValidator); + } + + private static int normalizeAndCheckMaxParallelism(int maxParallelism) { + if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) { + maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + } + + return checkBounds("max parallelism", maxParallelism); + } + + private static int checkParallelism(int parallelism) { + return checkBounds("parallelism", parallelism); + } + + private static int checkBounds(String name, int parallelism) { + Preconditions.checkArgument( + parallelism > 0 + && parallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + "Setting %s is not in valid bounds (1..%s), found: %s", + name, + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, + parallelism); + return parallelism; + } + + @Override + public int getParallelism() { + return this.parallelism; + } + + @Override + public int getMaxParallelism() { + return this.maxParallelism; + } + + @Override + public void setMaxParallelism(int maxParallelism) { + maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism); + + Optional<String> validationResult = rescaleMaxValidator.apply(maxParallelism); + if (validationResult.isPresent()) { + throw new IllegalArgumentException( + String.format( + "Rescaling max parallelism from %s to %s is not allowed: %s", + this.maxParallelism, maxParallelism, validationResult.get())); + } + + this.maxParallelism = maxParallelism; + } + + @Override + public boolean canRescaleMaxParallelism(int desiredMaxParallelism) { + // Technically a valid parallelism value, but one that cannot be rescaled to + if (desiredMaxParallelism == JobVertex.MAX_PARALLELISM_DEFAULT) { + return false; + } + + return !rescaleMaxValidator + .apply(normalizeAndCheckMaxParallelism(desiredMaxParallelism)) + .isPresent(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java new file mode 100644 index 0000000..a922a9e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** Maintains the configured parallelisms for vertices, which should be defined by a scheduler. */ +public class DefaultVertexParallelismStore implements MutableVertexParallelismStore { + private final Map<JobVertexID, VertexParallelismInformation> vertexToParallelismInfo = + new HashMap<>(); + + @Override + public void setParallelismInfo(JobVertexID vertexId, VertexParallelismInformation info) { + vertexToParallelismInfo.put(vertexId, info); + } + + @Override + public VertexParallelismInformation getParallelismInfo(JobVertexID vertexId) { + return Optional.ofNullable(vertexToParallelismInfo.get(vertexId)) + .orElseThrow( + () -> + new IllegalStateException( + String.format( + "No parallelism information set for vertex %s", + vertexId))); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java index f7fed50..b38dc10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java @@ -42,6 +42,8 @@ public interface ExecutionGraphFactory { * @param initializationTimestamp initializationTimestamp when the ExecutionGraph was created * @param vertexAttemptNumberStore vertexAttemptNumberStore keeping information about the vertex * attempts of previous runs + * @param vertexParallelismStore vertexMaxParallelismStore keeping information about the vertex + * max parallelism settings * @param log log to use for logging * @return restored {@link ExecutionGraph} * @throws Exception if the {@link ExecutionGraph} could not be created and restored @@ -54,6 +56,7 @@ public interface ExecutionGraphFactory { TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, long initializationTimestamp, VertexAttemptNumberStore vertexAttemptNumberStore, + VertexParallelismStore vertexParallelismStore, Logger log) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java new file mode 100644 index 0000000..fc22247 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +/** Mutability extension to the {@link VertexParallelismStore}. */ +public interface MutableVertexParallelismStore extends VertexParallelismStore { + /** + * Sets the parallelism properties for the given vertex. + * + * @param vertexId vertex to set parallelism for + * @param info parallelism information for the given vertex + */ + void setParallelismInfo(JobVertexID vertexId, VertexParallelismInformation info); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index f75bde1..ca999af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.CheckpointingOptions; @@ -57,6 +58,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; @@ -79,6 +81,7 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.util.BoundedFIFOQueue; import org.apache.flink.runtime.util.IntArrayList; import org.apache.flink.util.ExceptionUtils; @@ -239,6 +242,61 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling } } + /** + * Compute the {@link VertexParallelismStore} for all given vertices, which will set defaults + * and ensure that the returned store contains valid parallelisms. + * + * @param vertices the vertices to compute parallelism for + * @return the computed parallelism store + */ + public static VertexParallelismStore computeVertexParallelismStore( + Iterable<JobVertex> vertices) { + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + for (JobVertex vertex : vertices) { + int parallelism = vertex.getParallelism(); + if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) { + parallelism = 1; + } + + int maxParallelism = vertex.getMaxParallelism(); + final boolean autoConfigured; + // if no max parallelism was configured by the user, we calculate and set a default + if (maxParallelism == JobVertex.MAX_PARALLELISM_DEFAULT) { + maxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism); + autoConfigured = true; + } else { + autoConfigured = false; + } + + VertexParallelismInformation parallelismInfo = + new DefaultVertexParallelismInfo( + parallelism, + maxParallelism, + // Allow rescaling if the max parallelism was not set explicitly by the + // user + (newMax) -> + autoConfigured + ? Optional.empty() + : Optional.of( + "Cannot override a configured max parallelism.")); + store.setParallelismInfo(vertex.getID(), parallelismInfo); + } + + return store; + } + + /** + * Compute the {@link VertexParallelismStore} for all vertices of a given job graph, which will + * set defaults and ensure that the returned store contains valid parallelisms. + * + * @param jobGraph the job graph to retrieve vertices from + * @return the computed parallelism store + */ + public static VertexParallelismStore computeVertexParallelismStore(JobGraph jobGraph) { + return computeVertexParallelismStore(jobGraph.getVertices()); + } + private ExecutionGraph createAndRestoreExecutionGraph( CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, @@ -258,6 +316,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling jobGraph.getJobType()), initializationTimestamp, new DefaultVertexAttemptNumberStore(), + computeVertexParallelismStore(jobGraph), log); newExecutionGraph.setInternalTaskFailuresListener( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java new file mode 100644 index 0000000..1a9ba0c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +/** + * Manages the parallelism properties for a vertex in the execution graph, as well as how they can + * change during runtime. + */ +public interface VertexParallelismInformation { + /** + * Returns a vertex's parallelism. + * + * @return the parallelism for the vertex + */ + int getParallelism(); + + /** + * Returns the vertex's max parallelism. + * + * @return the max parallelism for the vertex + */ + int getMaxParallelism(); + + /** + * Changes a given vertex's max parallelism property. The caller should first check the validity + * of the new setting via {@link #canRescaleMaxParallelism}, otherwise this operation may fail. + * + * @param maxParallelism the new max parallelism for the vertex + */ + void setMaxParallelism(int maxParallelism); + + /** + * Returns whether the vertex's max parallelism can be changed to a given value. + * + * @param desiredMaxParallelism the desired max parallelism for the vertex + * @return whether the max parallelism can be changed to the given value + */ + boolean canRescaleMaxParallelism(int desiredMaxParallelism); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java new file mode 100644 index 0000000..8bd5adb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +/** + * Contains the max parallelism per vertex, along with metadata about how these maxes were + * calculated. + */ +public interface VertexParallelismStore { + /** + * Returns a given vertex's parallelism information. + * + * @param vertexId vertex for which the parallelism information should be returned + * @return a parallelism information for the given vertex + * @throws IllegalStateException if there is no parallelism information for the given vertex + */ + VertexParallelismInformation getParallelismInfo(JobVertexID vertexId); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 1ba3212..f90e723 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -22,6 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.metrics.Gauge; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; @@ -79,19 +81,25 @@ import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore; import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; +import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerUtils; import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController; import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -148,6 +156,7 @@ public class AdaptiveScheduler private static final Logger LOG = LoggerFactory.getLogger(AdaptiveScheduler.class); private final JobGraphJobInformation jobInformation; + private final VertexParallelismStore initialParallelismStore; private final DeclarativeSlotPool declarativeSlotPool; @@ -191,6 +200,8 @@ public class AdaptiveScheduler private BackgroundTask<ExecutionGraph> backgroundTask = BackgroundTask.finishedBackgroundTask(); + private final SchedulerExecutionMode executionMode; + public AdaptiveScheduler( JobGraph jobGraph, Configuration configuration, @@ -212,7 +223,13 @@ public class AdaptiveScheduler assertPreconditions(jobGraph); - this.jobInformation = new JobGraphJobInformation(jobGraph); + this.executionMode = configuration.get(JobManagerOptions.SCHEDULER_MODE); + + VertexParallelismStore vertexParallelismStore = + computeVertexParallelismStore(jobGraph, executionMode); + this.initialParallelismStore = vertexParallelismStore; + this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore); + this.declarativeSlotPool = declarativeSlotPool; this.initializationTimestamp = initializationTimestamp; this.ioExecutor = ioExecutor; @@ -270,6 +287,92 @@ public class AdaptiveScheduler } } + /** + * Creates the parallelism store for a set of vertices, optionally with a flag to leave the + * vertex parallelism unchanged. If the flag is set, the parallelisms must be valid for + * execution. + * + * <p>We need to set parallelism to the max possible value when requesting resources, but when + * executing the graph we should respect what we are actually given. + * + * @param vertices The vertices to store parallelism information for + * @param adjustParallelisms Whether to adjust the parallelisms + * @return The parallelism store. + */ + @VisibleForTesting + static VertexParallelismStore computeReactiveModeVertexParallelismStore( + Iterable<JobVertex> vertices, boolean adjustParallelisms) { + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + for (JobVertex vertex : vertices) { + // if no max parallelism was configured by the user, we calculate and set a default + final int parallelism; + final int maxParallelism; + if (adjustParallelisms) { + maxParallelism = + vertex.getMaxParallelism() == JobVertex.MAX_PARALLELISM_DEFAULT + ? KeyGroupRangeAssignment.computeDefaultMaxParallelism( + vertex.getParallelism()) + : vertex.getMaxParallelism(); + parallelism = maxParallelism; + } else { + parallelism = vertex.getParallelism(); + maxParallelism = vertex.getMaxParallelism(); + } + + VertexParallelismInformation parallelismInfo = + new DefaultVertexParallelismInfo( + parallelism, + maxParallelism, + // Allow rescaling if the new desired max parallelism + // is not less than what was declared here during scheduling. + // This prevents the situation where more resources are requested + // based on the computed default, when actually fewer are necessary. + (newMax) -> + newMax >= maxParallelism + ? Optional.empty() + : Optional.of( + "Cannot lower max parallelism in Reactive mode.")); + store.setParallelismInfo(vertex.getID(), parallelismInfo); + } + + return store; + } + + /** + * Creates the parallelism store that should be used for determining scheduling requirements, + * which may choose different parallelisms than set in the {@link JobGraph} depending on the + * execution mode. + * + * @param jobGraph The job graph for execution. + * @param executionMode The mode of scheduler execution. + * @return The parallelism store. + */ + private static VertexParallelismStore computeVertexParallelismStore( + JobGraph jobGraph, SchedulerExecutionMode executionMode) { + if (executionMode == SchedulerExecutionMode.REACTIVE) { + return computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), true); + } + return SchedulerBase.computeVertexParallelismStore(jobGraph); + } + + /** + * Creates the parallelism store that should be used to build the {@link ExecutionGraph}, which + * will respect the vertex parallelism of the passed {@link JobGraph} in all execution modes. + * + * @param jobGraph The job graph for execution. + * @param executionMode The mode of scheduler execution. + * @return The parallelism store. + */ + @VisibleForTesting + static VertexParallelismStore computeVertexParallelismStoreForExecution( + JobGraph jobGraph, SchedulerExecutionMode executionMode) { + if (executionMode == SchedulerExecutionMode.REACTIVE) { + return computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), false); + } + return SchedulerBase.computeVertexParallelismStore(jobGraph); + } + private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) { state.tryRun( ResourceConsumer.class, @@ -778,21 +881,33 @@ public class AdaptiveScheduler private CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> createExecutionGraphWithAvailableResourcesAsync() { - final JobGraph adjustedJobGraph; final VertexParallelism vertexParallelism; + final VertexParallelismStore adjustedParallelismStore; try { vertexParallelism = determineParallelism(slotAllocator); + JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); - adjustedJobGraph = jobInformation.copyJobGraph(); for (JobVertex vertex : adjustedJobGraph.getVertices()) { - vertex.setParallelism(vertexParallelism.getParallelism(vertex.getID())); + JobVertexID id = vertex.getID(); + + // use the originally computed max parallelism for constant runs, + // and the determined "available parallelism" to use + // the resources we have access to + vertex.setParallelism(vertexParallelism.getParallelism(id)); + + VertexParallelismInformation vertexParallelismInfo = + initialParallelismStore.getParallelismInfo(id); + vertex.setMaxParallelism(vertexParallelismInfo.getMaxParallelism()); } + + adjustedParallelismStore = + computeVertexParallelismStoreForExecution(adjustedJobGraph, executionMode); } catch (Exception exception) { return FutureUtils.completedExceptionally(exception); } - return createExecutionGraphAndRestoreStateAsync(adjustedJobGraph) + return createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore) .thenApply( executionGraph -> CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( @@ -838,28 +953,30 @@ public class AdaptiveScheduler } private CompletableFuture<ExecutionGraph> createExecutionGraphAndRestoreStateAsync( - JobGraph adjustedJobGraph) { + VertexParallelismStore adjustedParallelismStore) { backgroundTask.abort(); backgroundTask = backgroundTask.runAfter( - () -> createExecutionGraphAndRestoreState(adjustedJobGraph), ioExecutor); + () -> createExecutionGraphAndRestoreState(adjustedParallelismStore), + ioExecutor); return FutureUtils.switchExecutor( backgroundTask.getResultFuture(), getMainThreadExecutor()); } @Nonnull - private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph adjustedJobGraph) - throws Exception { + private ExecutionGraph createExecutionGraphAndRestoreState( + VertexParallelismStore adjustedParallelismStore) throws Exception { return executionGraphFactory.createAndRestoreExecutionGraph( - adjustedJobGraph, + jobInformation.copyJobGraph(), completedCheckpointStore, checkpointsCleaner, checkpointIdCounter, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, initializationTimestamp, vertexAttemptNumberStore, + adjustedParallelismStore, LOG); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java index 707d21f..849a5b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java @@ -22,6 +22,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation; import org.apache.flink.util.InstantiationUtil; @@ -36,11 +38,14 @@ public class JobGraphJobInformation implements JobInformation { private final JobGraph jobGraph; private final JobID jobID; private final String name; + private final VertexParallelismStore vertexParallelismStore; - public JobGraphJobInformation(JobGraph jobGraph) { + public JobGraphJobInformation( + JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) { this.jobGraph = jobGraph; this.jobID = jobGraph.getJobID(); this.name = jobGraph.getName(); + this.vertexParallelismStore = vertexParallelismStore; } @Override @@ -50,7 +55,9 @@ public class JobGraphJobInformation implements JobInformation { @Override public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) { - return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId)); + return new JobVertexInformation( + jobGraph.findVertexByID(jobVertexId), + vertexParallelismStore.getParallelismInfo(jobVertexId)); } public JobID getJobID() { @@ -62,12 +69,8 @@ public class JobGraphJobInformation implements JobInformation { } public Iterable<JobInformation.VertexInformation> getVertices() { - return jobGraphVerticesToVertexInformation(jobGraph.getVertices()); - } - - public static Iterable<JobInformation.VertexInformation> jobGraphVerticesToVertexInformation( - Iterable<JobVertex> verticesIterable) { - return Iterables.transform(verticesIterable, JobVertexInformation::new); + return Iterables.transform( + jobGraph.getVertices(), (vertex) -> getVertexInformation(vertex.getID())); } /** Returns a copy of a jobGraph that can be mutated. */ @@ -79,8 +82,12 @@ public class JobGraphJobInformation implements JobInformation { private final JobVertex jobVertex; - private JobVertexInformation(JobVertex jobVertex) { + private final VertexParallelismInformation parallelismInfo; + + private JobVertexInformation( + JobVertex jobVertex, VertexParallelismInformation parallelismInfo) { this.jobVertex = jobVertex; + this.parallelismInfo = parallelismInfo; } @Override @@ -90,7 +97,7 @@ public class JobGraphJobInformation implements JobInformation { @Override public int getParallelism() { - return jobVertex.getParallelism(); + return parallelismInfo.getParallelism(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java deleted file mode 100644 index c0d9cfa..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler.adaptive; - -import org.apache.flink.api.dag.Transformation; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Utilities for reactive mode. */ -public final class ReactiveModeUtils { - private static final Logger LOG = LoggerFactory.getLogger(ReactiveModeUtils.class); - - /** - * Sets the parallelism of all vertices in the passed JobGraph to the highest possible max - * parallelism, unless the user defined a maxParallelism. - * - * @param jobGraph The JobGraph to modify. - */ - public static void configureJobGraphForReactiveMode(JobGraph jobGraph) { - LOG.info("Modifying job parallelism for running in reactive mode."); - for (JobVertex vertex : jobGraph.getVertices()) { - if (vertex.getMaxParallelism() == JobVertex.MAX_PARALLELISM_DEFAULT) { - vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM); - vertex.setMaxParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM); - } else { - vertex.setParallelism(vertex.getMaxParallelism()); - } - } - } - - private ReactiveModeUtils() {} -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java index 4b6cb4b..c809573 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java @@ -45,6 +45,7 @@ import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleto import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -213,7 +214,7 @@ public class CheckpointMetadataLoadingTest { .thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorId))); if (parallelism != maxParallelism) { - when(vertex.isMaxParallelismConfigured()).thenReturn(true); + when(vertex.canRescaleMaxParallelism(anyInt())).thenReturn(false); } Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java index 5b7ff17..31e9473 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; @@ -41,6 +42,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; @@ -57,8 +60,10 @@ import static org.mockito.Mockito.when; */ public class DefaultExecutionGraphConstructionTest { - private ExecutionGraph createDefaultExecutionGraph() throws Exception { - return TestingDefaultExecutionGraphBuilder.newBuilder().build(); + private ExecutionGraph createDefaultExecutionGraph(List<JobVertex> vertices) throws Exception { + return TestingDefaultExecutionGraphBuilder.newBuilder() + .setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(vertices)) + .build(); } @Test @@ -77,8 +82,8 @@ public class DefaultExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3)); - ExecutionGraph eg1 = createDefaultExecutionGraph(); - ExecutionGraph eg2 = createDefaultExecutionGraph(); + ExecutionGraph eg1 = createDefaultExecutionGraph(ordered); + ExecutionGraph eg2 = createDefaultExecutionGraph(ordered); eg1.attachJobGraph(ordered); eg2.attachJobGraph(ordered); @@ -135,7 +140,7 @@ public class DefaultExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = createDefaultExecutionGraph(); + ExecutionGraph eg = createDefaultExecutionGraph(ordered); try { eg.attachJobGraph(ordered); } catch (JobException e) { @@ -172,18 +177,6 @@ public class DefaultExecutionGraphConstructionTest { IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED); - List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); - - ExecutionGraph eg = createDefaultExecutionGraph(); - try { - eg.attachJobGraph(ordered); - } catch (JobException e) { - e.printStackTrace(); - fail("Job failed with exception: " + e.getMessage()); - } - - // attach the second part of the graph - JobVertex v4 = new JobVertex("vertex4"); JobVertex v5 = new JobVertex("vertex5"); v4.setParallelism(11); @@ -198,7 +191,22 @@ public class DefaultExecutionGraphConstructionTest { v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL); - List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5)); + List<JobVertex> ordered = Arrays.asList(v1, v2, v3); + + List<JobVertex> ordered2 = Arrays.asList(v4, v5); + + ExecutionGraph eg = + createDefaultExecutionGraph( + Stream.concat(ordered.stream(), ordered2.stream()) + .collect(Collectors.toList())); + try { + eg.attachJobGraph(ordered); + } catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + // attach the second part of the graph try { eg.attachJobGraph(ordered2); @@ -237,18 +245,7 @@ public class DefaultExecutionGraphConstructionTest { IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED); - List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); - - ExecutionGraph eg = createDefaultExecutionGraph(); - try { - eg.attachJobGraph(ordered); - } catch (JobException e) { - e.printStackTrace(); - fail("Job failed with exception: " + e.getMessage()); - } - - // attach the second part of the graph - + // construct part two of the execution graph JobVertex v4 = new JobVertex("vertex4"); JobVertex v5 = new JobVertex("vertex5"); v4.setParallelism(11); @@ -263,8 +260,21 @@ public class DefaultExecutionGraphConstructionTest { v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL); - List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5)); + List<JobVertex> ordered = Arrays.asList(v1, v2, v3); + List<JobVertex> ordered2 = Arrays.asList(v4, v5); + ExecutionGraph eg = + createDefaultExecutionGraph( + Stream.concat(ordered.stream(), ordered2.stream()) + .collect(Collectors.toList())); + try { + eg.attachJobGraph(ordered); + } catch (JobException e) { + e.printStackTrace(); + fail("Job failed with exception: " + e.getMessage()); + } + + // attach the second part of the graph try { eg.attachJobGraph(ordered2); } catch (JobException e) { @@ -302,9 +312,18 @@ public class DefaultExecutionGraphConstructionTest { v1.setParallelism(7); v1.setInvokableClass(AbstractInvokable.class); - List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1)); + // construct part two of the execution graph + JobVertex v2 = new JobVertex("vertex2"); + v2.setInvokableClass(AbstractInvokable.class); + v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL); + + List<JobVertex> ordered = Arrays.asList(v1); + List<JobVertex> ordered2 = Arrays.asList(v2); - ExecutionGraph eg = createDefaultExecutionGraph(); + ExecutionGraph eg = + createDefaultExecutionGraph( + Stream.concat(ordered.stream(), ordered2.stream()) + .collect(Collectors.toList())); try { eg.attachJobGraph(ordered); } catch (JobException e) { @@ -313,12 +332,6 @@ public class DefaultExecutionGraphConstructionTest { } // attach the second part of the graph - JobVertex v2 = new JobVertex("vertex2"); - v2.setInvokableClass(AbstractInvokable.class); - v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL); - - List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v2)); - try { eg.attachJobGraph(ordered2); fail("Attached wrong jobgraph"); @@ -360,7 +373,7 @@ public class DefaultExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4)); - ExecutionGraph eg = createDefaultExecutionGraph(); + ExecutionGraph eg = createDefaultExecutionGraph(ordered); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -425,7 +438,7 @@ public class DefaultExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = createDefaultExecutionGraph(); + ExecutionGraph eg = createDefaultExecutionGraph(ordered); try { eg.attachJobGraph(ordered); } catch (JobException e) { @@ -459,7 +472,7 @@ public class DefaultExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); - ExecutionGraph eg = createDefaultExecutionGraph(); + ExecutionGraph eg = createDefaultExecutionGraph(ordered); try { eg.attachJobGraph(ordered); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index c78e8e4..b38ffb1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -347,9 +347,14 @@ public class ExecutionGraphTestUtils { } public static JobVertex createNoOpVertex(String name, int parallelism) { + return createNoOpVertex(name, parallelism, JobVertex.MAX_PARALLELISM_DEFAULT); + } + + public static JobVertex createNoOpVertex(String name, int parallelism, int maxParallelism) { JobVertex vertex = new JobVertex(name); vertex.setInvokableClass(NoOpInvokable.class); vertex.setParallelism(parallelism); + vertex.setMaxParallelism(maxParallelism); return vertex; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java index 756516f..cf37e87 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java @@ -22,110 +22,23 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.junit.Assert; import org.junit.Test; -public class ExecutionJobVertexTest { - - private static final int NOT_CONFIGURED = -1; +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +/** Test for {@link ExecutionJobVertex} */ +public class ExecutionJobVertexTest { @Test - public void testMaxParallelismDefaulting() throws Exception { - - // default minimum - ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, NOT_CONFIGURED); - Assert.assertEquals(128, executionJobVertex.getMaxParallelism()); - - // test round up part 1 - executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED); - Assert.assertEquals(256, executionJobVertex.getMaxParallelism()); - - // test round up part 2 - executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED); - Assert.assertEquals(512, executionJobVertex.getMaxParallelism()); - - // test round up limit - executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED); - Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism()); - - // test upper bound - try { - executionJobVertex = createExecutionJobVertex(1 + (1 << 15), NOT_CONFIGURED); - executionJobVertex.getMaxParallelism(); - Assert.fail(); - } catch (IllegalArgumentException ignore) { - } - - // parallelism must be smaller than the max parallelism - try { - createExecutionJobVertex(172, 4); - Assert.fail( - "We should not be able to create an ExecutionJobVertex which " - + "has a smaller max parallelism than parallelism."); - } catch (JobException ignored) { - // expected - } - - // test configured / trumps computed default - executionJobVertex = createExecutionJobVertex(4, 1 << 15); - Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism()); - - // test upper bound configured - try { - executionJobVertex = createExecutionJobVertex(4, 1 + (1 << 15)); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test lower bound configured - try { - executionJobVertex = createExecutionJobVertex(4, 0); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test override trumps test configured 2 - executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); - executionJobVertex.setMaxParallelism(7); - Assert.assertEquals(7, executionJobVertex.getMaxParallelism()); - - // test lower bound with derived value - executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); - try { - executionJobVertex.setMaxParallelism(0); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test upper bound with derived value - executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED); - try { - executionJobVertex.setMaxParallelism(1 + (1 << 15)); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalArgumentException ignore) { - } - - // test complain on setting derived value in presence of a configured value - executionJobVertex = createExecutionJobVertex(4, 16); - try { - executionJobVertex.setMaxParallelism(7); - Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism())); - } catch (IllegalStateException ignore) { - } - } - - // ------------------------------------------------------------------------------------------------------ - - public static ExecutionJobVertex createExecutionJobVertex( - int parallelism, int preconfiguredMaxParallelism) throws Exception { + public void testParallelismGreaterThanMaxParallelism() { JobVertex jobVertex = new JobVertex("testVertex"); jobVertex.setInvokableClass(AbstractInvokable.class); - jobVertex.setParallelism(parallelism); - - if (NOT_CONFIGURED != preconfiguredMaxParallelism) { - jobVertex.setMaxParallelism(preconfiguredMaxParallelism); - } + // parallelism must be smaller than the max parallelism + jobVertex.setParallelism(172); + jobVertex.setMaxParallelism(4); - return ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex); + assertThrows( + "higher than the max parallelism", + JobException.class, + () -> ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 75105a4..cba3bdb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.junit.Test; @@ -173,10 +174,6 @@ public class PointwisePatternTest { testConnections(10, 6, new int[][] {{0}, {1, 2}, {3, 4}, {5}, {6, 7}, {8, 9}}); } - private ExecutionGraph getDummyExecutionGraph() throws Exception { - return TestingDefaultExecutionGraphBuilder.newBuilder().build(); - } - private void testLowToHigh(int lowDop, int highDop) throws Exception { if (highDop < lowDop) { throw new IllegalArgumentException(); @@ -254,7 +251,11 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2)); - ExecutionGraph eg = getDummyExecutionGraph(); + ExecutionGraph eg = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setVertexParallelismStore( + SchedulerBase.computeVertexParallelismStore(ordered)) + .build(); try { eg.attachJobGraph(ordered); } catch (JobException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index 52c46a6..1a4180e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -37,6 +37,8 @@ import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -44,6 +46,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -74,6 +77,7 @@ public class TestingDefaultExecutionGraphBuilder { private ExecutionDeploymentListener executionDeploymentListener = NoOpExecutionDeploymentListener.get(); private ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> {}; + private VertexParallelismStore vertexParallelismStore; private TestingDefaultExecutionGraphBuilder() {} @@ -158,6 +162,12 @@ public class TestingDefaultExecutionGraphBuilder { return this; } + public TestingDefaultExecutionGraphBuilder setVertexParallelismStore( + VertexParallelismStore store) { + this.vertexParallelismStore = store; + return this; + } + public DefaultExecutionGraph build() throws JobException, JobExecutionException { return DefaultExecutionGraphBuilder.buildGraph( jobGraph, @@ -179,6 +189,8 @@ public class TestingDefaultExecutionGraphBuilder { executionDeploymentListener, executionStateUpdateListener, System.currentTimeMillis(), - new DefaultVertexAttemptNumberStore()); + new DefaultVertexAttemptNumberStore(), + Optional.ofNullable(vertexParallelismStore) + .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph))); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index c06ee09..3c56d35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.SchedulerBase; import org.junit.Test; @@ -79,7 +80,11 @@ public class VertexSlotSharingTest { List<JobVertex> vertices = new ArrayList<>(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().build(); + ExecutionGraph eg = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setVertexParallelismStore( + SchedulerBase.computeVertexParallelismStore(vertices)) + .build(); eg.attachJobGraph(vertices); // verify that the vertices are all in the same slot sharing group diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java index 2a57d04..c838e93 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java @@ -76,6 +76,7 @@ public class DefaultExecutionGraphFactoryTest extends TestLogger { TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), + SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator), log); fail("Expected ExecutionGraph creation to fail because of non restored state."); } catch (Exception e) { @@ -103,6 +104,7 @@ public class DefaultExecutionGraphFactoryTest extends TestLogger { TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), + SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator), log); final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint(false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java new file mode 100644 index 0000000..b09154b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; +import java.util.function.Function; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; + +/** Tests for the {@link DefaultVertexParallelismInfo}. */ +public class DefaultVertexParallelismInfoTest extends TestLogger { + private static final Function<Integer, Optional<String>> ALWAYS_VALID = + (max) -> Optional.empty(); + + @Test + public void parallelismInvalid() { + assertThrows( + "parallelism is not in valid bounds", + IllegalArgumentException.class, + () -> new DefaultVertexParallelismInfo(-1, 1, ALWAYS_VALID)); + } + + @Test + public void maxParallelismInvalid() { + assertThrows( + "max parallelism is not in valid bounds", + IllegalArgumentException.class, + () -> new DefaultVertexParallelismInfo(1, -1, ALWAYS_VALID)); + } + + @Test + public void setAutoMax() { + DefaultVertexParallelismInfo info = + new DefaultVertexParallelismInfo( + 1, ExecutionConfig.PARALLELISM_AUTO_MAX, ALWAYS_VALID); + + Assert.assertEquals( + KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, info.getMaxParallelism()); + } + + @Test + public void canRescaleMaxOutOfBounds() { + DefaultVertexParallelismInfo info = new DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID); + + assertThrows( + "not in valid bounds", + IllegalArgumentException.class, + () -> info.canRescaleMaxParallelism(-4)); + } + + @Test + public void canRescaleMaxAuto() { + DefaultVertexParallelismInfo info = new DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID); + + Assert.assertTrue(info.canRescaleMaxParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX)); + } + + @Test + public void canRescaleMax() { + DefaultVertexParallelismInfo info = new DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID); + + Assert.assertTrue(info.canRescaleMaxParallelism(3)); + } + + @Test + public void canRescaleMaxDefault() { + DefaultVertexParallelismInfo info = new DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID); + + Assert.assertFalse(info.canRescaleMaxParallelism(JobVertex.MAX_PARALLELISM_DEFAULT)); + } + + @Test + public void setMaxOutOfBounds() { + DefaultVertexParallelismInfo info = new DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID); + + assertThrows( + "not in valid bounds", + IllegalArgumentException.class, + () -> { + info.setMaxParallelism(-4); + return null; + }); + } + + @Test + public void setMaxInvalid() { + DefaultVertexParallelismInfo info = + new DefaultVertexParallelismInfo(1, 1, (max) -> Optional.of("not valid")); + + assertThrows( + "not valid", + IllegalArgumentException.class, + () -> { + info.setMaxParallelism(4); + return null; + }); + } + + @Test + public void setMaxValid() { + DefaultVertexParallelismInfo info = new DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID); + + info.setMaxParallelism(40); + + Assert.assertEquals(40, info.getMaxParallelism()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java new file mode 100644 index 0000000..d4ecf73 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; + +/** Tests for the {@link DefaultVertexParallelismStore}. */ +public class DefaultVertexParallelismStoreTest extends TestLogger { + @Test + public void testNotSet() { + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + assertThrows( + "No parallelism information set for vertex", + IllegalStateException.class, + () -> store.getParallelismInfo(new JobVertexID())); + } + + @Test + public void testSetInfo() { + JobVertexID id = new JobVertexID(); + VertexParallelismInformation info = new MockVertexParallelismInfo(); + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + store.setParallelismInfo(id, info); + + VertexParallelismInformation storedInfo = store.getParallelismInfo(id); + + Assert.assertEquals(storedInfo, storedInfo); + } + + private static final class MockVertexParallelismInfo implements VertexParallelismInformation { + @Override + public int getParallelism() { + return 0; + } + + @Override + public int getMaxParallelism() { + return 0; + } + + @Override + public void setMaxParallelism(int maxParallelism) {} + + @Override + public boolean canRescaleMaxParallelism(int desiredMaxParallelism) { + return false; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseComputeVertexParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseComputeVertexParallelismTest.java new file mode 100644 index 0000000..af80209 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseComputeVertexParallelismTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collections; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; + +/** Test vertex parallelism configuration for the {@link SchedulerBase}. */ +@RunWith(Parameterized.class) +public class SchedulerBaseComputeVertexParallelismTest extends TestLogger { + @Parameterized.Parameters( + name = + "parallelism = {0}, maxParallelism = {1}, expected max = {2}, rescale to = {3}, can rescale = {4}") + public static Object[][] data() { + return new Object[][] { + // default minimum + {1, JobVertex.MAX_PARALLELISM_DEFAULT, 128, 3, true}, + // test round up part 1 + {171, JobVertex.MAX_PARALLELISM_DEFAULT, 256, 42, true}, + // test round up part 2 + {172, JobVertex.MAX_PARALLELISM_DEFAULT, 512, 174, true}, + // test round up limit + { + UPPER_BOUND_MAX_PARALLELISM, + JobVertex.MAX_PARALLELISM_DEFAULT, + UPPER_BOUND_MAX_PARALLELISM, + UPPER_BOUND_MAX_PARALLELISM, + true + }, + // test configured / takes precedence computed default + {4, UPPER_BOUND_MAX_PARALLELISM, UPPER_BOUND_MAX_PARALLELISM, 3, false}, + // test override takes precedence test configured 2 + {4, 7, 7, UPPER_BOUND_MAX_PARALLELISM, false}, + }; + } + + @Parameterized.Parameter(0) + public int parallelism; + + @Parameterized.Parameter(1) + public int maxParallelism; + + @Parameterized.Parameter(2) + public int expectedMaxParallelism; + + @Parameterized.Parameter(3) + public int maxToScaleTo; + + @Parameterized.Parameter(4) + public boolean expectedCanRescaleTo; + + @Test + public void testMaxParallelismDefaulting() { + JobVertex jobVertex = createNoOpVertex("test", parallelism, maxParallelism); + VertexParallelismStore store = + SchedulerBase.computeVertexParallelismStore(Collections.singleton(jobVertex)); + + VertexParallelismInformation info = store.getParallelismInfo(jobVertex.getID()); + + Assert.assertEquals("constant parallelism", parallelism, info.getParallelism()); + Assert.assertEquals("expected max", expectedMaxParallelism, info.getMaxParallelism()); + + Assert.assertEquals( + "can rescale max", + expectedCanRescaleTo, + info.canRescaleMaxParallelism(maxToScaleTo)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java new file mode 100644 index 0000000..8ce3fec --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collections; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assume.assumeThat; + +/** Test vertex parallelism configuration for the {@link AdaptiveScheduler} in Reactive mode. */ +@RunWith(Parameterized.class) +public class AdaptiveSchedulerComputeReactiveModeVertexParallelismTest extends TestLogger { + @Parameterized.Parameters( + name = + "parallelism = {0}, maxParallelism = {1}, expected max = {2}, rescale to = {3}, can rescale = {4}") + public static Object[][] data() { + return new Object[][] { + // default minimum and rescale to higher + {1, JobVertex.MAX_PARALLELISM_DEFAULT, 128, 129, true}, + // test round up part 1 and rescale to lower + {171, JobVertex.MAX_PARALLELISM_DEFAULT, 256, 42, false}, + // test round up part 2 and rescale to equal + {172, JobVertex.MAX_PARALLELISM_DEFAULT, 512, 512, true}, + // test round up limit and rescale to equal + { + UPPER_BOUND_MAX_PARALLELISM, + JobVertex.MAX_PARALLELISM_DEFAULT, + UPPER_BOUND_MAX_PARALLELISM, + UPPER_BOUND_MAX_PARALLELISM, + true + }, + // test configured / takes precedence computed default and rescale to lower + {4, UPPER_BOUND_MAX_PARALLELISM, UPPER_BOUND_MAX_PARALLELISM, 3, false}, + // test override takes precedence test configured 2 and rescale to higher + {4, 7, 7, UPPER_BOUND_MAX_PARALLELISM, true}, + }; + } + + @Parameterized.Parameter(0) + public int parallelism; + + @Parameterized.Parameter(1) + public int maxParallelism; + + @Parameterized.Parameter(2) + public int expectedMaxParallelism; + + @Parameterized.Parameter(3) + public int maxToScaleTo; + + @Parameterized.Parameter(4) + public boolean expectedCanRescaleTo; + + @Test + public void testCreateStoreWithoutAdjustedParallelism() { + assumeThat( + "max parallelism must be set", + maxParallelism, + is(not(JobVertex.MAX_PARALLELISM_DEFAULT))); + + JobVertex jobVertex = createNoOpVertex("test", parallelism, maxParallelism); + VertexParallelismStore store = + AdaptiveScheduler.computeReactiveModeVertexParallelismStore( + Collections.singleton(jobVertex), false); + + VertexParallelismInformation info = store.getParallelismInfo(jobVertex.getID()); + + Assert.assertEquals("parallelism is not adjusted", parallelism, info.getParallelism()); + Assert.assertEquals("expected max", expectedMaxParallelism, info.getMaxParallelism()); + + Assert.assertEquals( + "can rescale max", + expectedCanRescaleTo, + info.canRescaleMaxParallelism(maxToScaleTo)); + } + + @Test + public void testCreateStoreWithAdjustedParallelism() { + JobVertex jobVertex = createNoOpVertex("test", parallelism, maxParallelism); + VertexParallelismStore store = + AdaptiveScheduler.computeReactiveModeVertexParallelismStore( + Collections.singleton(jobVertex), true); + + VertexParallelismInformation info = store.getParallelismInfo(jobVertex.getID()); + + Assert.assertEquals( + "parallelism is adjusted to max", expectedMaxParallelism, info.getParallelism()); + Assert.assertEquals("expected max", expectedMaxParallelism, info.getMaxParallelism()); + + Assert.assertEquals( + "can rescale max", + expectedCanRescaleTo, + info.canRescaleMaxParallelism(maxToScaleTo)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 0228427..8859dbb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -30,11 +30,11 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore; -import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; @@ -53,11 +53,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool; -import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils; -import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; @@ -66,9 +63,11 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.operators.coordination.TestOperatorEvent; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.scheduler.VertexParallelismInformation; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator; import org.apache.flink.runtime.slots.ResourceRequirement; -import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.ResourceCounter; @@ -87,6 +86,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -96,8 +97,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots; import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup; @@ -251,9 +254,8 @@ public class AdaptiveSchedulerTest extends TestLogger { final int numAvailableSlots = 1; - final OneShotLatch submitTaskLatch = new OneShotLatch(); - final TaskManagerGateway taskManagerGateway = - createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch); + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(numAvailableSlots); singleThreadMainThreadExecutor.execute( () -> { @@ -266,7 +268,8 @@ public class AdaptiveSchedulerTest extends TestLogger { taskManagerGateway); }); - submitTaskLatch.await(); + // wait for all tasks to be submitted + taskManagerGateway.waitForSubmissions(numAvailableSlots, Duration.ofSeconds(5)); final ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync( @@ -287,15 +290,18 @@ public class AdaptiveSchedulerTest extends TestLogger { final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); + final AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) .setInitializationTimestamp(initializationTimestamp) .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(configuration) .build(); - final OneShotLatch submitTaskLatch = new OneShotLatch(); - final TaskManagerGateway taskManagerGateway = - createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch); + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(PARALLELISM); singleThreadMainThreadExecutor.execute( () -> { @@ -308,7 +314,8 @@ public class AdaptiveSchedulerTest extends TestLogger { taskManagerGateway); }); - submitTaskLatch.await(); + // Wait for just the first submission to indicate the execution graph is ready + taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); final ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync( @@ -430,12 +437,8 @@ public class AdaptiveSchedulerTest extends TestLogger { final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get(); - final SimpleAckingTaskManagerGateway taskManagerGateway = - new SimpleAckingTaskManagerGateway(); - final BlockingQueue<AllocationID> submittedTasks = new ArrayBlockingQueue<>(5); - taskManagerGateway.setSubmitConsumer( - taskDeploymentDescriptor -> - submittedTasks.offer(taskDeploymentDescriptor.getAllocationId())); + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM); taskManagerGateway.setCancelConsumer( executionAttemptId -> @@ -459,7 +462,7 @@ public class AdaptiveSchedulerTest extends TestLogger { }); // wait for the first task submission - submittedTasks.take(); + taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); assertThat(numRestartsMetric.getValue(), is(0)); @@ -474,8 +477,8 @@ public class AdaptiveSchedulerTest extends TestLogger { taskManagerGateway); }); - // wait for the second task submission - submittedTasks.take(); + // wait for the second task submissions + taskManagerGateway.waitForSubmissions(PARALLELISM, Duration.ofSeconds(5)); assertThat(numRestartsMetric.getValue(), is(1)); } @@ -495,7 +498,7 @@ public class AdaptiveSchedulerTest extends TestLogger { } @Test - public void testStartSchedulingSetsResourceRequirements() throws Exception { + public void testStartSchedulingSetsResourceRequirementsForDefaultMode() throws Exception { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = @@ -513,27 +516,78 @@ public class AdaptiveSchedulerTest extends TestLogger { contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, PARALLELISM))); } - /** Tests that the listener for new slots is properly set up. */ @Test - public void testResourceAcquisitionTriggersJobExecution() throws Exception { + public void testStartSchedulingSetsResourceRequirementsForReactiveMode() throws Exception { final JobGraph jobGraph = createJobGraph(); final DefaultDeclarativeSlotPool declarativeSlotPool = createDeclarativeSlotPool(jobGraph.getJobID()); + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE); + final AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor) .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(configuration) .build(); scheduler.startScheduling(); - offerSlots( - declarativeSlotPool, - createSlotOffersForResourceRequirements( - ResourceCounter.withResource(ResourceProfile.UNKNOWN, PARALLELISM))); + // should request the max possible resources + final int expectedParallelism = + KeyGroupRangeAssignment.computeDefaultMaxParallelism(PARALLELISM); + assertThat( + declarativeSlotPool.getResourceRequirements(), + contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, expectedParallelism))); + } + + /** Tests that the listener for new slots is properly set up. */ + @Test + public void testResourceAcquisitionTriggersJobExecution() throws Exception { + final JobGraph jobGraph = createJobGraph(); + + final DefaultDeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID()); + + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); - assertThat(scheduler.getState(), instanceOf(CreatingExecutionGraph.class)); + final AdaptiveScheduler scheduler = + new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(configuration) + .build(); + + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(PARALLELISM); + + CompletableFuture<State> startingStateFuture = new CompletableFuture<>(); + singleThreadMainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + startingStateFuture.complete(scheduler.getState()); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, PARALLELISM)), + taskManagerGateway); + }); + + assertThat(startingStateFuture.get(), instanceOf(WaitingForResources.class)); + + // Wait for all tasks to be submitted + taskManagerGateway.waitForSubmissions(PARALLELISM, Duration.ofSeconds(5)); + + final ArchivedExecutionGraph executionGraph = + CompletableFuture.supplyAsync( + () -> scheduler.requestJob().getArchivedExecutionGraph(), + singleThreadMainThreadExecutor) + .get(); + + assertThat( + executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(), is(PARALLELISM)); } @Test @@ -752,22 +806,6 @@ public class AdaptiveSchedulerTest extends TestLogger { scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID()); } - @Nonnull - private TaskManagerGateway createWaitingForTaskSubmissionTaskManagerGateway( - OneShotLatch submitTaskLatch) { - final TaskManagerGateway taskManagerGateway = - SlotPoolTestUtils.createTaskManagerGateway( - new TestingTaskExecutorGatewayBuilder() - .setSubmitTaskConsumer( - (taskDeploymentDescriptor, jobMasterId) -> { - submitTaskLatch.trigger(); - return CompletableFuture.completedFuture( - Acknowledge.get()); - }) - .createTestingTaskExecutorGateway()); - return taskManagerGateway; - } - @Test public void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() throws Exception { @@ -791,6 +829,41 @@ public class AdaptiveSchedulerTest extends TestLogger { assertFalse(assignmentResult.isSuccess()); } + @Test + public void testComputeVertexParallelismStoreForExecutionInReactiveMode() { + JobVertex v1 = createNoOpVertex("v1", 1, 50); + JobVertex v2 = createNoOpVertex("v2", 50, 50); + JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2); + + VertexParallelismStore parallelismStore = + AdaptiveScheduler.computeVertexParallelismStoreForExecution( + graph, SchedulerExecutionMode.REACTIVE); + + for (JobVertex vertex : graph.getVertices()) { + VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID()); + + assertThat(info.getParallelism(), is(vertex.getParallelism())); + assertThat(info.getMaxParallelism(), is(vertex.getMaxParallelism())); + } + } + + @Test + public void testComputeVertexParallelismStoreForExecutionInDefaultMode() { + JobVertex v1 = createNoOpVertex("v1", 1, 50); + JobVertex v2 = createNoOpVertex("v2", 50, 50); + JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2); + + VertexParallelismStore parallelismStore = + AdaptiveScheduler.computeVertexParallelismStoreForExecution(graph, null); + + for (JobVertex vertex : graph.getVertices()) { + VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID()); + + assertThat(info.getParallelism(), is(vertex.getParallelism())); + assertThat(info.getMaxParallelism(), is(vertex.getMaxParallelism())); + } + } + // --------------------------------------------------------------------------------------------- // Utils // --------------------------------------------------------------------------------------------- @@ -823,18 +896,44 @@ public class AdaptiveSchedulerTest extends TestLogger { onLeaveCalled = true; onLeaveNewStateArgument = newState; } + } - private static class Factory implements StateFactory<LifecycleMethodCapturingState> { + /** + * A {@link SimpleAckingTaskManagerGateway} that buffers all the task submissions into a + * blocking queue, allowing one to wait for an arbitrary number of submissions. + */ + private static class SubmissionBufferingTaskManagerGateway + extends SimpleAckingTaskManagerGateway { + final BlockingQueue<TaskDeploymentDescriptor> submittedTasks; - @Override - public Class<LifecycleMethodCapturingState> getStateClass() { - return LifecycleMethodCapturingState.class; - } + public SubmissionBufferingTaskManagerGateway(int capacity) { + submittedTasks = new ArrayBlockingQueue<>(capacity); + super.setSubmitConsumer(submittedTasks::offer); + } - @Override - public LifecycleMethodCapturingState getState() { - return new LifecycleMethodCapturingState(); + @Override + public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) { + super.setSubmitConsumer( + ((Consumer<TaskDeploymentDescriptor>) submittedTasks::offer) + .andThen(submitConsumer)); + } + + /** + * Block until an arbitrary number of submissions have been received. + * + * @param numSubmissions The number of submissions to wait for + * @param perTaskTimeout The max amount of time to wait between each submission + * @return the list of the waited-for submissions + * @throws InterruptedException if a timeout is exceeded waiting for a submission + */ + public List<TaskDeploymentDescriptor> waitForSubmissions( + int numSubmissions, Duration perTaskTimeout) throws InterruptedException { + List<TaskDeploymentDescriptor> descriptors = new ArrayList<>(); + for (int i = 0; i < numSubmissions; i++) { + descriptors.add( + submittedTasks.poll(perTaskTimeout.toMillis(), TimeUnit.MILLISECONDS)); } + return descriptors; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index b60f0e2..683b915 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.P import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; @@ -68,6 +69,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; @@ -694,6 +696,7 @@ public class ExecutingTest extends TestLogger { 1, Time.milliseconds(1L), 1L, + new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()), new DefaultSubtaskAttemptNumberStore(Collections.emptyList())); mockExecutionVertex = new MockExecutionVertex(this); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java index 27f77cc..92419c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java @@ -22,6 +22,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.SchedulerBase; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration; import java.util.List; @@ -43,8 +45,13 @@ public class BuildExecutionGraphBenchmark { public void setup(JobConfiguration jobConfiguration) throws Exception { jobVertices = createDefaultJobVertices(jobConfiguration); final JobGraph jobGraph = createJobGraph(jobConfiguration); + final VertexParallelismStore parallelismStore = + SchedulerBase.computeVertexParallelismStore(jobVertices); executionGraph = - TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); + TestingDefaultExecutionGraphBuilder.newBuilder() + .setVertexParallelismStore(parallelismStore) + .setJobGraph(jobGraph) + .build(); } public void buildTopology() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index afd69e6..1484d67 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuild import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; @@ -116,7 +117,11 @@ public class RescalePartitionerTest extends StreamPartitionerTest { assertEquals(4, mapVertex.getParallelism()); assertEquals(2, sinkVertex.getParallelism()); - ExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().build(); + ExecutionGraph eg = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setVertexParallelismStore( + SchedulerBase.computeVertexParallelismStore(jobGraph)) + .build(); try { eg.attachJobGraph(jobVertices); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index cc7b3c3..26de8ec 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -36,8 +36,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -234,7 +234,7 @@ public class RescalingITCase extends TestLogger { } int restoreMaxParallelism = - deriveMaxParallelism ? ExecutionJobVertex.VALUE_NOT_SET : maxParallelism; + deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : maxParallelism; JobGraph scaledJobGraph = createJobGraphWithKeyedState(