This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4597d55  [FLINK-21844][runtime] Do not auto-configure maxParallelism 
in REACTIVE scheduling mode
4597d55 is described below

commit 4597d5557c640e0ef5a526cbb6d46686be5dd813
Author: Austin Cawley-Edwards <austin.caw...@gmail.com>
AuthorDate: Wed Mar 31 04:38:24 2021 -0400

    [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE 
scheduling mode
    
    This moves the configuration and management of vertex parallelism into the 
control of the scheduler, instead of the ExecutionGraphVertex. This gives the 
Adaptive scheduler assurances about the task resources when scheduling.
---
 .../flink/runtime/checkpoint/Checkpoints.java      |   3 +-
 .../checkpoint/StateAssignmentOperation.java       |   9 +-
 .../dispatcher/DefaultJobManagerRunnerFactory.java |   2 -
 .../executiongraph/DefaultExecutionGraph.java      |  15 +-
 .../DefaultExecutionGraphBuilder.java              |   7 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |  86 +++------
 .../MutableVertexAttemptNumberStore.java           |  15 +-
 .../scheduler/DefaultExecutionGraphFactory.java    |   4 +-
 .../scheduler/DefaultVertexParallelismInfo.java    | 112 +++++++++++
 .../scheduler/DefaultVertexParallelismStore.java   |  47 +++++
 .../runtime/scheduler/ExecutionGraphFactory.java   |   3 +
 .../scheduler/MutableVertexParallelismStore.java   |  32 ++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  59 ++++++
 .../scheduler/VertexParallelismInformation.java    |  55 ++++++
 .../runtime/scheduler/VertexParallelismStore.java  |  36 ++++
 .../scheduler/adaptive/AdaptiveScheduler.java      | 137 +++++++++++++-
 .../scheduler/adaptive/JobGraphJobInformation.java |  27 ++-
 .../scheduler/adaptive/ReactiveModeUtils.java      |  51 -----
 .../checkpoint/CheckpointMetadataLoadingTest.java  |   3 +-
 .../DefaultExecutionGraphConstructionTest.java     |  97 +++++-----
 .../executiongraph/ExecutionGraphTestUtils.java    |   5 +
 .../executiongraph/ExecutionJobVertexTest.java     | 109 ++---------
 .../executiongraph/PointwisePatternTest.java       |  11 +-
 .../TestingDefaultExecutionGraphBuilder.java       |  14 +-
 .../executiongraph/VertexSlotSharingTest.java      |   7 +-
 .../DefaultExecutionGraphFactoryTest.java          |   2 +
 .../DefaultVertexParallelismInfoTest.java          | 131 +++++++++++++
 .../DefaultVertexParallelismStoreTest.java         |  73 +++++++
 .../SchedulerBaseComputeVertexParallelismTest.java |  94 +++++++++
 ...erComputeReactiveModeVertexParallelismTest.java | 124 ++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 209 +++++++++++++++------
 .../runtime/scheduler/adaptive/ExecutingTest.java  |   3 +
 .../topology/BuildExecutionGraphBenchmark.java     |   9 +-
 .../partitioner/RescalePartitionerTest.java        |   7 +-
 .../flink/test/checkpointing/RescalingITCase.java  |   4 +-
 35 files changed, 1243 insertions(+), 359 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index fc792ca..45ff0d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -162,7 +162,8 @@ public class Checkpoints {
             if (executionJobVertex != null) {
 
                 if (executionJobVertex.getMaxParallelism() == 
operatorState.getMaxParallelism()
-                        || !executionJobVertex.isMaxParallelismConfigured()) {
+                        || executionJobVertex.canRescaleMaxParallelism(
+                                operatorState.getMaxParallelism())) {
                     operatorStates.put(operatorState.getOperatorID(), 
operatorState);
                 } else {
                     String msg =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 3381db3..42d7edf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -650,19 +650,16 @@ public class StateAssignmentOperation {
         // satisfy the restored state
         if (operatorState.getMaxParallelism() != 
executionJobVertex.getMaxParallelism()) {
 
-            if (!executionJobVertex.isMaxParallelismConfigured()) {
-                // if the max parallelism was not explicitly specified by the 
user, we derive it
-                // from the state
-
+            if 
(executionJobVertex.canRescaleMaxParallelism(operatorState.getMaxParallelism()))
 {
                 LOG.debug(
-                        "Overriding maximum parallelism for JobVertex {} from 
{} to {}",
+                        "Rescaling maximum parallelism for JobVertex {} from 
{} to {}",
                         executionJobVertex.getJobVertexId(),
                         executionJobVertex.getMaxParallelism(),
                         operatorState.getMaxParallelism());
 
                 
executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism());
             } else {
-                // if the max parallelism was explicitly specified, we 
complain on mismatch
+                // if the max parallelism cannot be rescaled, we complain on 
mismatch
                 throw new IllegalStateException(
                         "The maximum parallelism ("
                                 + operatorState.getMaxParallelism()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index e8543d4..10726d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.scheduler.adaptive.ReactiveModeUtils;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
 import org.apache.flink.util.Preconditions;
@@ -70,7 +69,6 @@ public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
                     slotPoolServiceSchedulerFactory.getSchedulerType()
                             == JobManagerOptions.SchedulerType.Adaptive,
                     "Adaptive Scheduler is required for reactive mode");
-            ReactiveModeUtils.configureJobGraphForReactiveMode(jobGraph);
         }
 
         final ShuffleMaster<?> shuffleMaster =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index a96bd4c..222d9c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -64,6 +64,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.scheduler.InternalFailuresListener;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -226,6 +228,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
 
     private final VertexAttemptNumberStore initialAttemptCounts;
 
+    private final VertexParallelismStore parallelismStore;
+
     // ------ Fields that are relevant to the execution and need to be cleared 
before archiving
     // -------
 
@@ -279,7 +283,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
             ExecutionDeploymentListener executionDeploymentListener,
             ExecutionStateUpdateListener executionStateUpdateListener,
             long initializationTimestamp,
-            VertexAttemptNumberStore initialAttemptCounts)
+            VertexAttemptNumberStore initialAttemptCounts,
+            VertexParallelismStore vertexParallelismStore)
             throws IOException {
 
         this.jobInformation = checkNotNull(jobInformation);
@@ -336,6 +341,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
 
         this.initialAttemptCounts = initialAttemptCounts;
 
+        this.parallelismStore = vertexParallelismStore;
+
         this.edgeManager = new EdgeManager();
         this.executionVerticesById = new HashMap<>();
         this.resultPartitionsById = new HashMap<>();
@@ -770,6 +777,9 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
                 this.isStoppable = false;
             }
 
+            VertexParallelismInformation parallelismInfo =
+                    parallelismStore.getParallelismInfo(jobVertex.getID());
+
             // create the execution job vertex and attach it to the graph
             ExecutionJobVertex ejv =
                     new ExecutionJobVertex(
@@ -778,7 +788,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
                             maxPriorAttemptsHistoryLength,
                             rpcTimeout,
                             createTimestamp,
-                            
this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));
+                            parallelismInfo,
+                            
initialAttemptCounts.getAttemptCounts(jobVertex.getID()));
 
             ejv.connectToPredecessors(this.intermediateResults);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 57dbcd3..be546c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageLoader;
@@ -91,7 +92,8 @@ public class DefaultExecutionGraphBuilder {
             ExecutionDeploymentListener executionDeploymentListener,
             ExecutionStateUpdateListener executionStateUpdateListener,
             long initializationTimestamp,
-            VertexAttemptNumberStore vertexAttemptNumberStore)
+            VertexAttemptNumberStore vertexAttemptNumberStore,
+            VertexParallelismStore vertexParallelismStore)
             throws JobExecutionException, JobException {
 
         checkNotNull(jobGraph, "job graph cannot be null");
@@ -134,7 +136,8 @@ public class DefaultExecutionGraphBuilder {
                             executionDeploymentListener,
                             executionStateUpdateListener,
                             initializationTimestamp,
-                            vertexAttemptNumberStore);
+                            vertexAttemptNumberStore,
+                            vertexParallelismStore);
         } catch (IOException e) {
             throw new JobException("Could not create the ExecutionGraph.", e);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 66b32d3..050d7a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -46,11 +45,10 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
@@ -85,8 +83,6 @@ public class ExecutionJobVertex
     /** Use the same log for all ExecutionGraph classes. */
     private static final Logger LOG = DefaultExecutionGraph.LOG;
 
-    public static final int VALUE_NOT_SET = -1;
-
     private final Object stateMonitor = new Object();
 
     private final InternalExecutionGraphAccessor graph;
@@ -99,7 +95,7 @@ public class ExecutionJobVertex
 
     private final List<IntermediateResult> inputs;
 
-    private final int parallelism;
+    private final VertexParallelismInformation parallelismInfo;
 
     private final SlotSharingGroup slotSharingGroup;
 
@@ -107,10 +103,6 @@ public class ExecutionJobVertex
 
     private final InputSplit[] inputSplits;
 
-    private final boolean maxParallelismConfigured;
-
-    private int maxParallelism;
-
     private final ResourceProfile resourceProfile;
 
     /**
@@ -132,6 +124,7 @@ public class ExecutionJobVertex
             int maxPriorAttemptsHistoryLength,
             Time timeout,
             long createTimestamp,
+            VertexParallelismInformation parallelismInfo,
             SubtaskAttemptNumberStore initialAttemptCounts)
             throws JobException {
 
@@ -142,30 +135,22 @@ public class ExecutionJobVertex
         this.graph = graph;
         this.jobVertex = jobVertex;
 
-        this.parallelism = jobVertex.getParallelism() > 0 ? 
jobVertex.getParallelism() : 1;
-
-        final int configuredMaxParallelism = jobVertex.getMaxParallelism();
-
-        this.maxParallelismConfigured = (VALUE_NOT_SET != 
configuredMaxParallelism);
-
-        // if no max parallelism was configured by the user, we calculate and 
set a default
-        setMaxParallelismInternal(
-                maxParallelismConfigured
-                        ? configuredMaxParallelism
-                        : 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(this.parallelism));
+        this.parallelismInfo = parallelismInfo;
 
         // verify that our parallelism is not higher than the maximum 
parallelism
-        if (this.parallelism > maxParallelism) {
+        if (this.parallelismInfo.getParallelism() > 
this.parallelismInfo.getMaxParallelism()) {
             throw new JobException(
                     String.format(
                             "Vertex %s's parallelism (%s) is higher than the 
max parallelism (%s). Please lower the parallelism or increase the max 
parallelism.",
-                            jobVertex.getName(), this.parallelism, 
maxParallelism));
+                            jobVertex.getName(),
+                            this.parallelismInfo.getParallelism(),
+                            this.parallelismInfo.getMaxParallelism()));
         }
 
         this.resourceProfile =
                 ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), 
MemorySize.ZERO);
 
-        this.taskVertices = new ExecutionVertex[this.parallelism];
+        this.taskVertices = new 
ExecutionVertex[this.parallelismInfo.getParallelism()];
 
         this.inputs = new ArrayList<>(jobVertex.getInputs().size());
 
@@ -182,11 +167,14 @@ public class ExecutionJobVertex
 
             this.producedDataSets[i] =
                     new IntermediateResult(
-                            result.getId(), this, this.parallelism, 
result.getResultType());
+                            result.getId(),
+                            this,
+                            this.parallelismInfo.getParallelism(),
+                            result.getResultType());
         }
 
         // create all task vertices
-        for (int i = 0; i < this.parallelism; i++) {
+        for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {
             ExecutionVertex vertex =
                     new ExecutionVertex(
                             this,
@@ -203,7 +191,7 @@ public class ExecutionJobVertex
         // sanity check for the double referencing between intermediate result 
partitions and
         // execution vertices
         for (IntermediateResult ir : this.producedDataSets) {
-            if (ir.getNumberOfAssignedPartitions() != parallelism) {
+            if (ir.getNumberOfAssignedPartitions() != 
this.parallelismInfo.getParallelism()) {
                 throw new RuntimeException(
                         "The intermediate result's partitions were not 
correctly assigned.");
             }
@@ -242,7 +230,8 @@ public class ExecutionJobVertex
                 ClassLoader oldContextClassLoader = 
currentThread.getContextClassLoader();
                 
currentThread.setContextClassLoader(graph.getUserClassLoader());
                 try {
-                    inputSplits = 
splitSource.createInputSplits(this.parallelism);
+                    inputSplits =
+                            
splitSource.createInputSplits(this.parallelismInfo.getParallelism());
 
                     if (inputSplits != null) {
                         splitAssigner = 
splitSource.getInputSplitAssigner(inputSplits);
@@ -269,31 +258,8 @@ public class ExecutionJobVertex
         return jobVertex.getOperatorIDs();
     }
 
-    public void setMaxParallelism(int maxParallelismDerived) {
-
-        Preconditions.checkState(
-                !maxParallelismConfigured,
-                "Attempt to override a configured max parallelism. Configured: 
"
-                        + this.maxParallelism
-                        + ", argument: "
-                        + maxParallelismDerived);
-
-        setMaxParallelismInternal(maxParallelismDerived);
-    }
-
-    private void setMaxParallelismInternal(int maxParallelism) {
-        if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-            maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
-        }
-
-        Preconditions.checkArgument(
-                maxParallelism > 0
-                        && maxParallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
-                "Overriding max parallelism is not in valid bounds (1..%s), 
found: %s",
-                KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
-                maxParallelism);
-
-        this.maxParallelism = maxParallelism;
+    public void setMaxParallelism(int maxParallelism) {
+        parallelismInfo.setMaxParallelism(maxParallelism);
     }
 
     public InternalExecutionGraphAccessor getGraph() {
@@ -311,12 +277,12 @@ public class ExecutionJobVertex
 
     @Override
     public int getParallelism() {
-        return parallelism;
+        return parallelismInfo.getParallelism();
     }
 
     @Override
     public int getMaxParallelism() {
-        return maxParallelism;
+        return parallelismInfo.getMaxParallelism();
     }
 
     @Override
@@ -324,8 +290,8 @@ public class ExecutionJobVertex
         return resourceProfile;
     }
 
-    public boolean isMaxParallelismConfigured() {
-        return maxParallelismConfigured;
+    public boolean canRescaleMaxParallelism(int desiredMaxParallelism) {
+        return parallelismInfo.canRescaleMaxParallelism(desiredMaxParallelism);
     }
 
     public JobID getJobId() {
@@ -379,8 +345,8 @@ public class ExecutionJobVertex
                         new TaskInformation(
                                 jobVertex.getID(),
                                 jobVertex.getName(),
-                                parallelism,
-                                maxParallelism,
+                                parallelismInfo.getParallelism(),
+                                parallelismInfo.getMaxParallelism(),
                                 jobVertex.getInvokableClassName(),
                                 jobVertex.getConfiguration());
 
@@ -399,7 +365,7 @@ public class ExecutionJobVertex
             num[vertex.getExecutionState().ordinal()]++;
         }
 
-        return getAggregateJobVertexState(num, parallelism);
+        return getAggregateJobVertexState(num, 
this.parallelismInfo.getParallelism());
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java
index 1234bc5..9af2ac2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/MutableVertexAttemptNumberStore.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 3dbb80d..f1721c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -91,6 +91,7 @@ public class DefaultExecutionGraphFactory implements 
ExecutionGraphFactory {
             TaskDeploymentDescriptorFactory.PartitionLocationConstraint 
partitionLocationConstraint,
             long initializationTimestamp,
             VertexAttemptNumberStore vertexAttemptNumberStore,
+            VertexParallelismStore vertexParallelismStore,
             Logger log)
             throws Exception {
         ExecutionDeploymentListener executionDeploymentListener =
@@ -122,7 +123,8 @@ public class DefaultExecutionGraphFactory implements 
ExecutionGraphFactory {
                         executionDeploymentListener,
                         executionStateUpdateListener,
                         initializationTimestamp,
-                        vertexAttemptNumberStore);
+                        vertexAttemptNumberStore,
+                        vertexParallelismStore);
 
         final CheckpointCoordinator checkpointCoordinator =
                 newExecutionGraph.getCheckpointCoordinator();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java
new file mode 100644
index 0000000..e76c254
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** A {@link VertexParallelismInformation} implementation that provides common 
validation. */
+public class DefaultVertexParallelismInfo implements 
VertexParallelismInformation {
+    private final int parallelism;
+    private int maxParallelism;
+    private final Function<Integer, Optional<String>> rescaleMaxValidator;
+
+    /**
+     * Create {@link VertexParallelismInformation} with max parallelism 
rescaling validation for a
+     * vertex.
+     *
+     * @param parallelism the vertex's parallelism
+     * @param maxParallelism the vertex's max parallelism
+     * @param rescaleMaxValidator the validation function to provide an error 
message if a max
+     *     parallelism rescale is not allowed
+     */
+    public DefaultVertexParallelismInfo(
+            int parallelism,
+            int maxParallelism,
+            Function<Integer, Optional<String>> rescaleMaxValidator) {
+        this.parallelism = checkParallelism(parallelism);
+        this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism);
+        this.rescaleMaxValidator = 
Preconditions.checkNotNull(rescaleMaxValidator);
+    }
+
+    private static int normalizeAndCheckMaxParallelism(int maxParallelism) {
+        if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+        }
+
+        return checkBounds("max parallelism", maxParallelism);
+    }
+
+    private static int checkParallelism(int parallelism) {
+        return checkBounds("parallelism", parallelism);
+    }
+
+    private static int checkBounds(String name, int parallelism) {
+        Preconditions.checkArgument(
+                parallelism > 0
+                        && parallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                "Setting %s is not in valid bounds (1..%s), found: %s",
+                name,
+                KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+                parallelism);
+        return parallelism;
+    }
+
+    @Override
+    public int getParallelism() {
+        return this.parallelism;
+    }
+
+    @Override
+    public int getMaxParallelism() {
+        return this.maxParallelism;
+    }
+
+    @Override
+    public void setMaxParallelism(int maxParallelism) {
+        maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism);
+
+        Optional<String> validationResult = 
rescaleMaxValidator.apply(maxParallelism);
+        if (validationResult.isPresent()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Rescaling max parallelism from %s to %s is not 
allowed: %s",
+                            this.maxParallelism, maxParallelism, 
validationResult.get()));
+        }
+
+        this.maxParallelism = maxParallelism;
+    }
+
+    @Override
+    public boolean canRescaleMaxParallelism(int desiredMaxParallelism) {
+        // Technically a valid parallelism value, but one that cannot be 
rescaled to
+        if (desiredMaxParallelism == JobVertex.MAX_PARALLELISM_DEFAULT) {
+            return false;
+        }
+
+        return !rescaleMaxValidator
+                .apply(normalizeAndCheckMaxParallelism(desiredMaxParallelism))
+                .isPresent();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java
new file mode 100644
index 0000000..a922a9e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Maintains the configured parallelisms for vertices, which should be 
defined by a scheduler. */
+public class DefaultVertexParallelismStore implements 
MutableVertexParallelismStore {
+    private final Map<JobVertexID, VertexParallelismInformation> 
vertexToParallelismInfo =
+            new HashMap<>();
+
+    @Override
+    public void setParallelismInfo(JobVertexID vertexId, 
VertexParallelismInformation info) {
+        vertexToParallelismInfo.put(vertexId, info);
+    }
+
+    @Override
+    public VertexParallelismInformation getParallelismInfo(JobVertexID 
vertexId) {
+        return Optional.ofNullable(vertexToParallelismInfo.get(vertexId))
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        String.format(
+                                                "No parallelism information 
set for vertex %s",
+                                                vertexId)));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java
index f7fed50..b38dc10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java
@@ -42,6 +42,8 @@ public interface ExecutionGraphFactory {
      * @param initializationTimestamp initializationTimestamp when the 
ExecutionGraph was created
      * @param vertexAttemptNumberStore vertexAttemptNumberStore keeping 
information about the vertex
      *     attempts of previous runs
+     * @param vertexParallelismStore vertexMaxParallelismStore keeping 
information about the vertex
+     *     max parallelism settings
      * @param log log to use for logging
      * @return restored {@link ExecutionGraph}
      * @throws Exception if the {@link ExecutionGraph} could not be created 
and restored
@@ -54,6 +56,7 @@ public interface ExecutionGraphFactory {
             TaskDeploymentDescriptorFactory.PartitionLocationConstraint 
partitionLocationConstraint,
             long initializationTimestamp,
             VertexAttemptNumberStore vertexAttemptNumberStore,
+            VertexParallelismStore vertexParallelismStore,
             Logger log)
             throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java
new file mode 100644
index 0000000..fc22247
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/** Mutability extension to the {@link VertexParallelismStore}. */
+public interface MutableVertexParallelismStore extends VertexParallelismStore {
+    /**
+     * Sets the parallelism properties for the given vertex.
+     *
+     * @param vertexId vertex to set parallelism for
+     * @param info parallelism information for the given vertex
+     */
+    void setParallelismInfo(JobVertexID vertexId, VertexParallelismInformation 
info);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index f75bde1..ca999af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -57,6 +58,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
@@ -79,6 +81,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.util.BoundedFIFOQueue;
 import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.util.ExceptionUtils;
@@ -239,6 +242,61 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
         }
     }
 
+    /**
+     * Compute the {@link VertexParallelismStore} for all given vertices, 
which will set defaults
+     * and ensure that the returned store contains valid parallelisms.
+     *
+     * @param vertices the vertices to compute parallelism for
+     * @return the computed parallelism store
+     */
+    public static VertexParallelismStore computeVertexParallelismStore(
+            Iterable<JobVertex> vertices) {
+        DefaultVertexParallelismStore store = new 
DefaultVertexParallelismStore();
+
+        for (JobVertex vertex : vertices) {
+            int parallelism = vertex.getParallelism();
+            if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_DEFAULT) {
+                parallelism = 1;
+            }
+
+            int maxParallelism = vertex.getMaxParallelism();
+            final boolean autoConfigured;
+            // if no max parallelism was configured by the user, we calculate 
and set a default
+            if (maxParallelism == JobVertex.MAX_PARALLELISM_DEFAULT) {
+                maxParallelism = 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism);
+                autoConfigured = true;
+            } else {
+                autoConfigured = false;
+            }
+
+            VertexParallelismInformation parallelismInfo =
+                    new DefaultVertexParallelismInfo(
+                            parallelism,
+                            maxParallelism,
+                            // Allow rescaling if the max parallelism was not 
set explicitly by the
+                            // user
+                            (newMax) ->
+                                    autoConfigured
+                                            ? Optional.empty()
+                                            : Optional.of(
+                                                    "Cannot override a 
configured max parallelism."));
+            store.setParallelismInfo(vertex.getID(), parallelismInfo);
+        }
+
+        return store;
+    }
+
+    /**
+     * Compute the {@link VertexParallelismStore} for all vertices of a given 
job graph, which will
+     * set defaults and ensure that the returned store contains valid 
parallelisms.
+     *
+     * @param jobGraph the job graph to retrieve vertices from
+     * @return the computed parallelism store
+     */
+    public static VertexParallelismStore 
computeVertexParallelismStore(JobGraph jobGraph) {
+        return computeVertexParallelismStore(jobGraph.getVertices());
+    }
+
     private ExecutionGraph createAndRestoreExecutionGraph(
             CompletedCheckpointStore completedCheckpointStore,
             CheckpointsCleaner checkpointsCleaner,
@@ -258,6 +316,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
                                 jobGraph.getJobType()),
                         initializationTimestamp,
                         new DefaultVertexAttemptNumberStore(),
+                        computeVertexParallelismStore(jobGraph),
                         log);
 
         newExecutionGraph.setInternalTaskFailuresListener(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java
new file mode 100644
index 0000000..1a9ba0c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+/**
+ * Manages the parallelism properties for a vertex in the execution graph, as 
well as how they can
+ * change during runtime.
+ */
+public interface VertexParallelismInformation {
+    /**
+     * Returns a vertex's parallelism.
+     *
+     * @return the parallelism for the vertex
+     */
+    int getParallelism();
+
+    /**
+     * Returns the vertex's max parallelism.
+     *
+     * @return the max parallelism for the vertex
+     */
+    int getMaxParallelism();
+
+    /**
+     * Changes a given vertex's max parallelism property. The caller should 
first check the validity
+     * of the new setting via {@link #canRescaleMaxParallelism}, otherwise 
this operation may fail.
+     *
+     * @param maxParallelism the new max parallelism for the vertex
+     */
+    void setMaxParallelism(int maxParallelism);
+
+    /**
+     * Returns whether the vertex's max parallelism can be changed to a given 
value.
+     *
+     * @param desiredMaxParallelism the desired max parallelism for the vertex
+     * @return whether the max parallelism can be changed to the given value
+     */
+    boolean canRescaleMaxParallelism(int desiredMaxParallelism);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java
new file mode 100644
index 0000000..8bd5adb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Contains the max parallelism per vertex, along with metadata about how 
these maxes were
+ * calculated.
+ */
+public interface VertexParallelismStore {
+    /**
+     * Returns a given vertex's parallelism information.
+     *
+     * @param vertexId vertex for which the parallelism information should be 
returned
+     * @return a parallelism information for the given vertex
+     * @throws IllegalStateException if there is no parallelism information 
for the given vertex
+     */
+    VertexParallelismInformation getParallelismInfo(JobVertexID vertexId);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 1ba3212..f90e723 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
@@ -79,19 +81,25 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
 import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerUtils;
 import 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ReactiveScaleUpController;
 import 
org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.ScaleUpController;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -148,6 +156,7 @@ public class AdaptiveScheduler
     private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveScheduler.class);
 
     private final JobGraphJobInformation jobInformation;
+    private final VertexParallelismStore initialParallelismStore;
 
     private final DeclarativeSlotPool declarativeSlotPool;
 
@@ -191,6 +200,8 @@ public class AdaptiveScheduler
 
     private BackgroundTask<ExecutionGraph> backgroundTask = 
BackgroundTask.finishedBackgroundTask();
 
+    private final SchedulerExecutionMode executionMode;
+
     public AdaptiveScheduler(
             JobGraph jobGraph,
             Configuration configuration,
@@ -212,7 +223,13 @@ public class AdaptiveScheduler
 
         assertPreconditions(jobGraph);
 
-        this.jobInformation = new JobGraphJobInformation(jobGraph);
+        this.executionMode = 
configuration.get(JobManagerOptions.SCHEDULER_MODE);
+
+        VertexParallelismStore vertexParallelismStore =
+                computeVertexParallelismStore(jobGraph, executionMode);
+        this.initialParallelismStore = vertexParallelismStore;
+        this.jobInformation = new JobGraphJobInformation(jobGraph, 
vertexParallelismStore);
+
         this.declarativeSlotPool = declarativeSlotPool;
         this.initializationTimestamp = initializationTimestamp;
         this.ioExecutor = ioExecutor;
@@ -270,6 +287,92 @@ public class AdaptiveScheduler
         }
     }
 
+    /**
+     * Creates the parallelism store for a set of vertices, optionally with a 
flag to leave the
+     * vertex parallelism unchanged. If the flag is set, the parallelisms must 
be valid for
+     * execution.
+     *
+     * <p>We need to set parallelism to the max possible value when requesting 
resources, but when
+     * executing the graph we should respect what we are actually given.
+     *
+     * @param vertices The vertices to store parallelism information for
+     * @param adjustParallelisms Whether to adjust the parallelisms
+     * @return The parallelism store.
+     */
+    @VisibleForTesting
+    static VertexParallelismStore computeReactiveModeVertexParallelismStore(
+            Iterable<JobVertex> vertices, boolean adjustParallelisms) {
+        DefaultVertexParallelismStore store = new 
DefaultVertexParallelismStore();
+
+        for (JobVertex vertex : vertices) {
+            // if no max parallelism was configured by the user, we calculate 
and set a default
+            final int parallelism;
+            final int maxParallelism;
+            if (adjustParallelisms) {
+                maxParallelism =
+                        vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT
+                                ? 
KeyGroupRangeAssignment.computeDefaultMaxParallelism(
+                                        vertex.getParallelism())
+                                : vertex.getMaxParallelism();
+                parallelism = maxParallelism;
+            } else {
+                parallelism = vertex.getParallelism();
+                maxParallelism = vertex.getMaxParallelism();
+            }
+
+            VertexParallelismInformation parallelismInfo =
+                    new DefaultVertexParallelismInfo(
+                            parallelism,
+                            maxParallelism,
+                            // Allow rescaling if the new desired max 
parallelism
+                            // is not less than what was declared here during 
scheduling.
+                            // This prevents the situation where more 
resources are requested
+                            // based on the computed default, when actually 
fewer are necessary.
+                            (newMax) ->
+                                    newMax >= maxParallelism
+                                            ? Optional.empty()
+                                            : Optional.of(
+                                                    "Cannot lower max 
parallelism in Reactive mode."));
+            store.setParallelismInfo(vertex.getID(), parallelismInfo);
+        }
+
+        return store;
+    }
+
+    /**
+     * Creates the parallelism store that should be used for determining 
scheduling requirements,
+     * which may choose different parallelisms than set in the {@link 
JobGraph} depending on the
+     * execution mode.
+     *
+     * @param jobGraph The job graph for execution.
+     * @param executionMode The mode of scheduler execution.
+     * @return The parallelism store.
+     */
+    private static VertexParallelismStore computeVertexParallelismStore(
+            JobGraph jobGraph, SchedulerExecutionMode executionMode) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            return 
computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), true);
+        }
+        return SchedulerBase.computeVertexParallelismStore(jobGraph);
+    }
+
+    /**
+     * Creates the parallelism store that should be used to build the {@link 
ExecutionGraph}, which
+     * will respect the vertex parallelism of the passed {@link JobGraph} in 
all execution modes.
+     *
+     * @param jobGraph The job graph for execution.
+     * @param executionMode The mode of scheduler execution.
+     * @return The parallelism store.
+     */
+    @VisibleForTesting
+    static VertexParallelismStore computeVertexParallelismStoreForExecution(
+            JobGraph jobGraph, SchedulerExecutionMode executionMode) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            return 
computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), false);
+        }
+        return SchedulerBase.computeVertexParallelismStore(jobGraph);
+    }
+
     private void newResourcesAvailable(Collection<? extends PhysicalSlot> 
physicalSlots) {
         state.tryRun(
                 ResourceConsumer.class,
@@ -778,21 +881,33 @@ public class AdaptiveScheduler
 
     private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
             createExecutionGraphWithAvailableResourcesAsync() {
-        final JobGraph adjustedJobGraph;
         final VertexParallelism vertexParallelism;
+        final VertexParallelismStore adjustedParallelismStore;
 
         try {
             vertexParallelism = determineParallelism(slotAllocator);
+            JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
 
-            adjustedJobGraph = jobInformation.copyJobGraph();
             for (JobVertex vertex : adjustedJobGraph.getVertices()) {
-                
vertex.setParallelism(vertexParallelism.getParallelism(vertex.getID()));
+                JobVertexID id = vertex.getID();
+
+                // use the originally computed max parallelism for constant 
runs,
+                // and the determined "available parallelism" to use
+                // the resources we have access to
+                vertex.setParallelism(vertexParallelism.getParallelism(id));
+
+                VertexParallelismInformation vertexParallelismInfo =
+                        initialParallelismStore.getParallelismInfo(id);
+                
vertex.setMaxParallelism(vertexParallelismInfo.getMaxParallelism());
             }
+
+            adjustedParallelismStore =
+                    
computeVertexParallelismStoreForExecution(adjustedJobGraph, executionMode);
         } catch (Exception exception) {
             return FutureUtils.completedExceptionally(exception);
         }
 
-        return createExecutionGraphAndRestoreStateAsync(adjustedJobGraph)
+        return 
createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore)
                 .thenApply(
                         executionGraph ->
                                 
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
@@ -838,28 +953,30 @@ public class AdaptiveScheduler
     }
 
     private CompletableFuture<ExecutionGraph> 
createExecutionGraphAndRestoreStateAsync(
-            JobGraph adjustedJobGraph) {
+            VertexParallelismStore adjustedParallelismStore) {
         backgroundTask.abort();
 
         backgroundTask =
                 backgroundTask.runAfter(
-                        () -> 
createExecutionGraphAndRestoreState(adjustedJobGraph), ioExecutor);
+                        () -> 
createExecutionGraphAndRestoreState(adjustedParallelismStore),
+                        ioExecutor);
 
         return FutureUtils.switchExecutor(
                 backgroundTask.getResultFuture(), getMainThreadExecutor());
     }
 
     @Nonnull
-    private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph 
adjustedJobGraph)
-            throws Exception {
+    private ExecutionGraph createExecutionGraphAndRestoreState(
+            VertexParallelismStore adjustedParallelismStore) throws Exception {
         return executionGraphFactory.createAndRestoreExecutionGraph(
-                adjustedJobGraph,
+                jobInformation.copyJobGraph(),
                 completedCheckpointStore,
                 checkpointsCleaner,
                 checkpointIdCounter,
                 
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
                 initializationTimestamp,
                 vertexAttemptNumberStore,
+                adjustedParallelismStore,
                 LOG);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
index 707d21f..849a5b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -36,11 +38,14 @@ public class JobGraphJobInformation implements 
JobInformation {
     private final JobGraph jobGraph;
     private final JobID jobID;
     private final String name;
+    private final VertexParallelismStore vertexParallelismStore;
 
-    public JobGraphJobInformation(JobGraph jobGraph) {
+    public JobGraphJobInformation(
+            JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) {
         this.jobGraph = jobGraph;
         this.jobID = jobGraph.getJobID();
         this.name = jobGraph.getName();
+        this.vertexParallelismStore = vertexParallelismStore;
     }
 
     @Override
@@ -50,7 +55,9 @@ public class JobGraphJobInformation implements JobInformation 
{
 
     @Override
     public JobInformation.VertexInformation getVertexInformation(JobVertexID 
jobVertexId) {
-        return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+        return new JobVertexInformation(
+                jobGraph.findVertexByID(jobVertexId),
+                vertexParallelismStore.getParallelismInfo(jobVertexId));
     }
 
     public JobID getJobID() {
@@ -62,12 +69,8 @@ public class JobGraphJobInformation implements 
JobInformation {
     }
 
     public Iterable<JobInformation.VertexInformation> getVertices() {
-        return jobGraphVerticesToVertexInformation(jobGraph.getVertices());
-    }
-
-    public static Iterable<JobInformation.VertexInformation> 
jobGraphVerticesToVertexInformation(
-            Iterable<JobVertex> verticesIterable) {
-        return Iterables.transform(verticesIterable, 
JobVertexInformation::new);
+        return Iterables.transform(
+                jobGraph.getVertices(), (vertex) -> 
getVertexInformation(vertex.getID()));
     }
 
     /** Returns a copy of a jobGraph that can be mutated. */
@@ -79,8 +82,12 @@ public class JobGraphJobInformation implements 
JobInformation {
 
         private final JobVertex jobVertex;
 
-        private JobVertexInformation(JobVertex jobVertex) {
+        private final VertexParallelismInformation parallelismInfo;
+
+        private JobVertexInformation(
+                JobVertex jobVertex, VertexParallelismInformation 
parallelismInfo) {
             this.jobVertex = jobVertex;
+            this.parallelismInfo = parallelismInfo;
         }
 
         @Override
@@ -90,7 +97,7 @@ public class JobGraphJobInformation implements JobInformation 
{
 
         @Override
         public int getParallelism() {
-            return jobVertex.getParallelism();
+            return parallelismInfo.getParallelism();
         }
 
         @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
deleted file mode 100644
index c0d9cfa..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Utilities for reactive mode. */
-public final class ReactiveModeUtils {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ReactiveModeUtils.class);
-
-    /**
-     * Sets the parallelism of all vertices in the passed JobGraph to the 
highest possible max
-     * parallelism, unless the user defined a maxParallelism.
-     *
-     * @param jobGraph The JobGraph to modify.
-     */
-    public static void configureJobGraphForReactiveMode(JobGraph jobGraph) {
-        LOG.info("Modifying job parallelism for running in reactive mode.");
-        for (JobVertex vertex : jobGraph.getVertices()) {
-            if (vertex.getMaxParallelism() == 
JobVertex.MAX_PARALLELISM_DEFAULT) {
-                
vertex.setParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
-                
vertex.setMaxParallelism(Transformation.UPPER_BOUND_MAX_PARALLELISM);
-            } else {
-                vertex.setParallelism(vertex.getMaxParallelism());
-            }
-        }
-    }
-
-    private ReactiveModeUtils() {}
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 4b6cb4b..c809573 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -45,6 +45,7 @@ import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleto
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -213,7 +214,7 @@ public class CheckpointMetadataLoadingTest {
                 
.thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorId)));
 
         if (parallelism != maxParallelism) {
-            when(vertex.isMaxParallelismConfigured()).thenReturn(true);
+            when(vertex.canRescaleMaxParallelism(anyInt())).thenReturn(false);
         }
 
         Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
index 5b7ff17..31e9473 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
@@ -41,6 +42,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
@@ -57,8 +60,10 @@ import static org.mockito.Mockito.when;
  */
 public class DefaultExecutionGraphConstructionTest {
 
-    private ExecutionGraph createDefaultExecutionGraph() throws Exception {
-        return TestingDefaultExecutionGraphBuilder.newBuilder().build();
+    private ExecutionGraph createDefaultExecutionGraph(List<JobVertex> 
vertices) throws Exception {
+        return TestingDefaultExecutionGraphBuilder.newBuilder()
+                
.setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(vertices))
+                .build();
     }
 
     @Test
@@ -77,8 +82,8 @@ public class DefaultExecutionGraphConstructionTest {
 
         List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
 
-        ExecutionGraph eg1 = createDefaultExecutionGraph();
-        ExecutionGraph eg2 = createDefaultExecutionGraph();
+        ExecutionGraph eg1 = createDefaultExecutionGraph(ordered);
+        ExecutionGraph eg2 = createDefaultExecutionGraph(ordered);
         eg1.attachJobGraph(ordered);
         eg2.attachJobGraph(ordered);
 
@@ -135,7 +140,7 @@ public class DefaultExecutionGraphConstructionTest {
 
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3, v4, v5));
 
-        ExecutionGraph eg = createDefaultExecutionGraph();
+        ExecutionGraph eg = createDefaultExecutionGraph(ordered);
         try {
             eg.attachJobGraph(ordered);
         } catch (JobException e) {
@@ -172,18 +177,6 @@ public class DefaultExecutionGraphConstructionTest {
         IntermediateDataSet v3result_2 =
                 v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 
-        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3));
-
-        ExecutionGraph eg = createDefaultExecutionGraph();
-        try {
-            eg.attachJobGraph(ordered);
-        } catch (JobException e) {
-            e.printStackTrace();
-            fail("Job failed with exception: " + e.getMessage());
-        }
-
-        // attach the second part of the graph
-
         JobVertex v4 = new JobVertex("vertex4");
         JobVertex v5 = new JobVertex("vertex5");
         v4.setParallelism(11);
@@ -198,7 +191,22 @@ public class DefaultExecutionGraphConstructionTest {
                 v4, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
         v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL);
 
-        List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, 
v5));
+        List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+
+        List<JobVertex> ordered2 = Arrays.asList(v4, v5);
+
+        ExecutionGraph eg =
+                createDefaultExecutionGraph(
+                        Stream.concat(ordered.stream(), ordered2.stream())
+                                .collect(Collectors.toList()));
+        try {
+            eg.attachJobGraph(ordered);
+        } catch (JobException e) {
+            e.printStackTrace();
+            fail("Job failed with exception: " + e.getMessage());
+        }
+
+        // attach the second part of the graph
 
         try {
             eg.attachJobGraph(ordered2);
@@ -237,18 +245,7 @@ public class DefaultExecutionGraphConstructionTest {
         IntermediateDataSet v3result_2 =
                 v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 
-        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3));
-
-        ExecutionGraph eg = createDefaultExecutionGraph();
-        try {
-            eg.attachJobGraph(ordered);
-        } catch (JobException e) {
-            e.printStackTrace();
-            fail("Job failed with exception: " + e.getMessage());
-        }
-
-        // attach the second part of the graph
-
+        // construct part two of the execution graph
         JobVertex v4 = new JobVertex("vertex4");
         JobVertex v5 = new JobVertex("vertex5");
         v4.setParallelism(11);
@@ -263,8 +260,21 @@ public class DefaultExecutionGraphConstructionTest {
                 v4, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.PIPELINED);
         v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL);
 
-        List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, 
v5));
+        List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+        List<JobVertex> ordered2 = Arrays.asList(v4, v5);
 
+        ExecutionGraph eg =
+                createDefaultExecutionGraph(
+                        Stream.concat(ordered.stream(), ordered2.stream())
+                                .collect(Collectors.toList()));
+        try {
+            eg.attachJobGraph(ordered);
+        } catch (JobException e) {
+            e.printStackTrace();
+            fail("Job failed with exception: " + e.getMessage());
+        }
+
+        // attach the second part of the graph
         try {
             eg.attachJobGraph(ordered2);
         } catch (JobException e) {
@@ -302,9 +312,18 @@ public class DefaultExecutionGraphConstructionTest {
         v1.setParallelism(7);
         v1.setInvokableClass(AbstractInvokable.class);
 
-        List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
+        // construct part two of the execution graph
+        JobVertex v2 = new JobVertex("vertex2");
+        v2.setInvokableClass(AbstractInvokable.class);
+        v2.connectIdInput(new IntermediateDataSetID(), 
DistributionPattern.ALL_TO_ALL);
+
+        List<JobVertex> ordered = Arrays.asList(v1);
+        List<JobVertex> ordered2 = Arrays.asList(v2);
 
-        ExecutionGraph eg = createDefaultExecutionGraph();
+        ExecutionGraph eg =
+                createDefaultExecutionGraph(
+                        Stream.concat(ordered.stream(), ordered2.stream())
+                                .collect(Collectors.toList()));
         try {
             eg.attachJobGraph(ordered);
         } catch (JobException e) {
@@ -313,12 +332,6 @@ public class DefaultExecutionGraphConstructionTest {
         }
 
         // attach the second part of the graph
-        JobVertex v2 = new JobVertex("vertex2");
-        v2.setInvokableClass(AbstractInvokable.class);
-        v2.connectIdInput(new IntermediateDataSetID(), 
DistributionPattern.ALL_TO_ALL);
-
-        List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v2));
-
         try {
             eg.attachJobGraph(ordered2);
             fail("Attached wrong jobgraph");
@@ -360,7 +373,7 @@ public class DefaultExecutionGraphConstructionTest {
 
         List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, 
v2, v3, v5, v4));
 
-        ExecutionGraph eg = createDefaultExecutionGraph();
+        ExecutionGraph eg = createDefaultExecutionGraph(ordered);
         try {
             eg.attachJobGraph(ordered);
             fail("Attached wrong jobgraph");
@@ -425,7 +438,7 @@ public class DefaultExecutionGraphConstructionTest {
 
             List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
-            ExecutionGraph eg = createDefaultExecutionGraph();
+            ExecutionGraph eg = createDefaultExecutionGraph(ordered);
             try {
                 eg.attachJobGraph(ordered);
             } catch (JobException e) {
@@ -459,7 +472,7 @@ public class DefaultExecutionGraphConstructionTest {
 
             List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
-            ExecutionGraph eg = createDefaultExecutionGraph();
+            ExecutionGraph eg = createDefaultExecutionGraph(ordered);
 
             try {
                 eg.attachJobGraph(ordered);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index c78e8e4..b38ffb1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -347,9 +347,14 @@ public class ExecutionGraphTestUtils {
     }
 
     public static JobVertex createNoOpVertex(String name, int parallelism) {
+        return createNoOpVertex(name, parallelism, 
JobVertex.MAX_PARALLELISM_DEFAULT);
+    }
+
+    public static JobVertex createNoOpVertex(String name, int parallelism, int 
maxParallelism) {
         JobVertex vertex = new JobVertex(name);
         vertex.setInvokableClass(NoOpInvokable.class);
         vertex.setParallelism(parallelism);
+        vertex.setMaxParallelism(maxParallelism);
         return vertex;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
index 756516f..cf37e87 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
@@ -22,110 +22,23 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
-import org.junit.Assert;
 import org.junit.Test;
 
-public class ExecutionJobVertexTest {
-
-    private static final int NOT_CONFIGURED = -1;
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 
+/** Test for {@link ExecutionJobVertex} */
+public class ExecutionJobVertexTest {
     @Test
-    public void testMaxParallelismDefaulting() throws Exception {
-
-        // default minimum
-        ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, 
NOT_CONFIGURED);
-        Assert.assertEquals(128, executionJobVertex.getMaxParallelism());
-
-        // test round up part 1
-        executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED);
-        Assert.assertEquals(256, executionJobVertex.getMaxParallelism());
-
-        // test round up part 2
-        executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED);
-        Assert.assertEquals(512, executionJobVertex.getMaxParallelism());
-
-        // test round up limit
-        executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED);
-        Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
-
-        // test upper bound
-        try {
-            executionJobVertex = createExecutionJobVertex(1 + (1 << 15), 
NOT_CONFIGURED);
-            executionJobVertex.getMaxParallelism();
-            Assert.fail();
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // parallelism must be smaller than the max parallelism
-        try {
-            createExecutionJobVertex(172, 4);
-            Assert.fail(
-                    "We should not be able to create an ExecutionJobVertex 
which "
-                            + "has a smaller max parallelism than 
parallelism.");
-        } catch (JobException ignored) {
-            // expected
-        }
-
-        // test configured / trumps computed default
-        executionJobVertex = createExecutionJobVertex(4, 1 << 15);
-        Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
-
-        // test upper bound configured
-        try {
-            executionJobVertex = createExecutionJobVertex(4, 1 + (1 << 15));
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test lower bound configured
-        try {
-            executionJobVertex = createExecutionJobVertex(4, 0);
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test override trumps test configured 2
-        executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
-        executionJobVertex.setMaxParallelism(7);
-        Assert.assertEquals(7, executionJobVertex.getMaxParallelism());
-
-        // test lower bound with derived value
-        executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
-        try {
-            executionJobVertex.setMaxParallelism(0);
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test upper bound with derived value
-        executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
-        try {
-            executionJobVertex.setMaxParallelism(1 + (1 << 15));
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalArgumentException ignore) {
-        }
-
-        // test complain on setting derived value in presence of a configured 
value
-        executionJobVertex = createExecutionJobVertex(4, 16);
-        try {
-            executionJobVertex.setMaxParallelism(7);
-            
Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
-        } catch (IllegalStateException ignore) {
-        }
-    }
-
-    // 
------------------------------------------------------------------------------------------------------
-
-    public static ExecutionJobVertex createExecutionJobVertex(
-            int parallelism, int preconfiguredMaxParallelism) throws Exception 
{
+    public void testParallelismGreaterThanMaxParallelism() {
         JobVertex jobVertex = new JobVertex("testVertex");
         jobVertex.setInvokableClass(AbstractInvokable.class);
-        jobVertex.setParallelism(parallelism);
-
-        if (NOT_CONFIGURED != preconfiguredMaxParallelism) {
-            jobVertex.setMaxParallelism(preconfiguredMaxParallelism);
-        }
+        // parallelism must be smaller than the max parallelism
+        jobVertex.setParallelism(172);
+        jobVertex.setMaxParallelism(4);
 
-        return ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex);
+        assertThrows(
+                "higher than the max parallelism",
+                JobException.class,
+                () -> 
ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 75105a4..cba3bdb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 
 import org.junit.Test;
@@ -173,10 +174,6 @@ public class PointwisePatternTest {
         testConnections(10, 6, new int[][] {{0}, {1, 2}, {3, 4}, {5}, {6, 7}, 
{8, 9}});
     }
 
-    private ExecutionGraph getDummyExecutionGraph() throws Exception {
-        return TestingDefaultExecutionGraphBuilder.newBuilder().build();
-    }
-
     private void testLowToHigh(int lowDop, int highDop) throws Exception {
         if (highDop < lowDop) {
             throw new IllegalArgumentException();
@@ -254,7 +251,11 @@ public class PointwisePatternTest {
 
         List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
 
-        ExecutionGraph eg = getDummyExecutionGraph();
+        ExecutionGraph eg =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setVertexParallelismStore(
+                                
SchedulerBase.computeVertexParallelismStore(ordered))
+                        .build();
         try {
             eg.attachJobGraph(ordered);
         } catch (JobException e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 52c46a6..1a4180e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -37,6 +37,8 @@ import 
org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -44,6 +46,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -74,6 +77,7 @@ public class TestingDefaultExecutionGraphBuilder {
     private ExecutionDeploymentListener executionDeploymentListener =
             NoOpExecutionDeploymentListener.get();
     private ExecutionStateUpdateListener executionStateUpdateListener = 
(execution, newState) -> {};
+    private VertexParallelismStore vertexParallelismStore;
 
     private TestingDefaultExecutionGraphBuilder() {}
 
@@ -158,6 +162,12 @@ public class TestingDefaultExecutionGraphBuilder {
         return this;
     }
 
+    public TestingDefaultExecutionGraphBuilder setVertexParallelismStore(
+            VertexParallelismStore store) {
+        this.vertexParallelismStore = store;
+        return this;
+    }
+
     public DefaultExecutionGraph build() throws JobException, 
JobExecutionException {
         return DefaultExecutionGraphBuilder.buildGraph(
                 jobGraph,
@@ -179,6 +189,8 @@ public class TestingDefaultExecutionGraphBuilder {
                 executionDeploymentListener,
                 executionStateUpdateListener,
                 System.currentTimeMillis(),
-                new DefaultVertexAttemptNumberStore());
+                new DefaultVertexAttemptNumberStore(),
+                Optional.ofNullable(vertexParallelismStore)
+                        .orElseGet(() -> 
SchedulerBase.computeVertexParallelismStore(jobGraph)));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index c06ee09..3c56d35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 
 import org.junit.Test;
 
@@ -79,7 +80,11 @@ public class VertexSlotSharingTest {
 
             List<JobVertex> vertices = new ArrayList<>(Arrays.asList(v1, v2, 
v3, v4, v5));
 
-            ExecutionGraph eg = 
TestingDefaultExecutionGraphBuilder.newBuilder().build();
+            ExecutionGraph eg =
+                    TestingDefaultExecutionGraphBuilder.newBuilder()
+                            .setVertexParallelismStore(
+                                    
SchedulerBase.computeVertexParallelismStore(vertices))
+                            .build();
             eg.attachJobGraph(vertices);
 
             // verify that the vertices are all in the same slot sharing group
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java
index 2a57d04..c838e93 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java
@@ -76,6 +76,7 @@ public class DefaultExecutionGraphFactoryTest extends 
TestLogger {
                     
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN,
                     0L,
                     new DefaultVertexAttemptNumberStore(),
+                    
SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator),
                     log);
             fail("Expected ExecutionGraph creation to fail because of non 
restored state.");
         } catch (Exception e) {
@@ -103,6 +104,7 @@ public class DefaultExecutionGraphFactoryTest extends 
TestLogger {
                 
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN,
                 0L,
                 new DefaultVertexAttemptNumberStore(),
+                
SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator),
                 log);
 
         final CompletedCheckpoint savepoint = 
completedCheckpointStore.getLatestCheckpoint(false);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java
new file mode 100644
index 0000000..b09154b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+
+/** Tests for the {@link DefaultVertexParallelismInfo}. */
+public class DefaultVertexParallelismInfoTest extends TestLogger {
+    private static final Function<Integer, Optional<String>> ALWAYS_VALID =
+            (max) -> Optional.empty();
+
+    @Test
+    public void parallelismInvalid() {
+        assertThrows(
+                "parallelism is not in valid bounds",
+                IllegalArgumentException.class,
+                () -> new DefaultVertexParallelismInfo(-1, 1, ALWAYS_VALID));
+    }
+
+    @Test
+    public void maxParallelismInvalid() {
+        assertThrows(
+                "max parallelism is not in valid bounds",
+                IllegalArgumentException.class,
+                () -> new DefaultVertexParallelismInfo(1, -1, ALWAYS_VALID));
+    }
+
+    @Test
+    public void setAutoMax() {
+        DefaultVertexParallelismInfo info =
+                new DefaultVertexParallelismInfo(
+                        1, ExecutionConfig.PARALLELISM_AUTO_MAX, ALWAYS_VALID);
+
+        Assert.assertEquals(
+                KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, 
info.getMaxParallelism());
+    }
+
+    @Test
+    public void canRescaleMaxOutOfBounds() {
+        DefaultVertexParallelismInfo info = new 
DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID);
+
+        assertThrows(
+                "not in valid bounds",
+                IllegalArgumentException.class,
+                () -> info.canRescaleMaxParallelism(-4));
+    }
+
+    @Test
+    public void canRescaleMaxAuto() {
+        DefaultVertexParallelismInfo info = new 
DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID);
+
+        
Assert.assertTrue(info.canRescaleMaxParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX));
+    }
+
+    @Test
+    public void canRescaleMax() {
+        DefaultVertexParallelismInfo info = new 
DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID);
+
+        Assert.assertTrue(info.canRescaleMaxParallelism(3));
+    }
+
+    @Test
+    public void canRescaleMaxDefault() {
+        DefaultVertexParallelismInfo info = new 
DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID);
+
+        
Assert.assertFalse(info.canRescaleMaxParallelism(JobVertex.MAX_PARALLELISM_DEFAULT));
+    }
+
+    @Test
+    public void setMaxOutOfBounds() {
+        DefaultVertexParallelismInfo info = new 
DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID);
+
+        assertThrows(
+                "not in valid bounds",
+                IllegalArgumentException.class,
+                () -> {
+                    info.setMaxParallelism(-4);
+                    return null;
+                });
+    }
+
+    @Test
+    public void setMaxInvalid() {
+        DefaultVertexParallelismInfo info =
+                new DefaultVertexParallelismInfo(1, 1, (max) -> 
Optional.of("not valid"));
+
+        assertThrows(
+                "not valid",
+                IllegalArgumentException.class,
+                () -> {
+                    info.setMaxParallelism(4);
+                    return null;
+                });
+    }
+
+    @Test
+    public void setMaxValid() {
+        DefaultVertexParallelismInfo info = new 
DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID);
+
+        info.setMaxParallelism(40);
+
+        Assert.assertEquals(40, info.getMaxParallelism());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java
new file mode 100644
index 0000000..d4ecf73
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+
+/** Tests for the {@link DefaultVertexParallelismStore}. */
+public class DefaultVertexParallelismStoreTest extends TestLogger {
+    @Test
+    public void testNotSet() {
+        DefaultVertexParallelismStore store = new 
DefaultVertexParallelismStore();
+
+        assertThrows(
+                "No parallelism information set for vertex",
+                IllegalStateException.class,
+                () -> store.getParallelismInfo(new JobVertexID()));
+    }
+
+    @Test
+    public void testSetInfo() {
+        JobVertexID id = new JobVertexID();
+        VertexParallelismInformation info = new MockVertexParallelismInfo();
+        DefaultVertexParallelismStore store = new 
DefaultVertexParallelismStore();
+
+        store.setParallelismInfo(id, info);
+
+        VertexParallelismInformation storedInfo = store.getParallelismInfo(id);
+
+        Assert.assertEquals(storedInfo, storedInfo);
+    }
+
+    private static final class MockVertexParallelismInfo implements 
VertexParallelismInformation {
+        @Override
+        public int getParallelism() {
+            return 0;
+        }
+
+        @Override
+        public int getMaxParallelism() {
+            return 0;
+        }
+
+        @Override
+        public void setMaxParallelism(int maxParallelism) {}
+
+        @Override
+        public boolean canRescaleMaxParallelism(int desiredMaxParallelism) {
+            return false;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseComputeVertexParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseComputeVertexParallelismTest.java
new file mode 100644
index 0000000..af80209
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseComputeVertexParallelismTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collections;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+
+/** Test vertex parallelism configuration for the {@link SchedulerBase}. */
+@RunWith(Parameterized.class)
+public class SchedulerBaseComputeVertexParallelismTest extends TestLogger {
+    @Parameterized.Parameters(
+            name =
+                    "parallelism = {0}, maxParallelism = {1}, expected max = 
{2}, rescale to = {3}, can rescale = {4}")
+    public static Object[][] data() {
+        return new Object[][] {
+            // default minimum
+            {1, JobVertex.MAX_PARALLELISM_DEFAULT, 128, 3, true},
+            // test round up part 1
+            {171, JobVertex.MAX_PARALLELISM_DEFAULT, 256, 42, true},
+            // test round up part 2
+            {172, JobVertex.MAX_PARALLELISM_DEFAULT, 512, 174, true},
+            // test round up limit
+            {
+                UPPER_BOUND_MAX_PARALLELISM,
+                JobVertex.MAX_PARALLELISM_DEFAULT,
+                UPPER_BOUND_MAX_PARALLELISM,
+                UPPER_BOUND_MAX_PARALLELISM,
+                true
+            },
+            // test configured / takes precedence computed default
+            {4, UPPER_BOUND_MAX_PARALLELISM, UPPER_BOUND_MAX_PARALLELISM, 3, 
false},
+            // test override takes precedence test configured 2
+            {4, 7, 7, UPPER_BOUND_MAX_PARALLELISM, false},
+        };
+    }
+
+    @Parameterized.Parameter(0)
+    public int parallelism;
+
+    @Parameterized.Parameter(1)
+    public int maxParallelism;
+
+    @Parameterized.Parameter(2)
+    public int expectedMaxParallelism;
+
+    @Parameterized.Parameter(3)
+    public int maxToScaleTo;
+
+    @Parameterized.Parameter(4)
+    public boolean expectedCanRescaleTo;
+
+    @Test
+    public void testMaxParallelismDefaulting() {
+        JobVertex jobVertex = createNoOpVertex("test", parallelism, 
maxParallelism);
+        VertexParallelismStore store =
+                
SchedulerBase.computeVertexParallelismStore(Collections.singleton(jobVertex));
+
+        VertexParallelismInformation info = 
store.getParallelismInfo(jobVertex.getID());
+
+        Assert.assertEquals("constant parallelism", parallelism, 
info.getParallelism());
+        Assert.assertEquals("expected max", expectedMaxParallelism, 
info.getMaxParallelism());
+
+        Assert.assertEquals(
+                "can rescale max",
+                expectedCanRescaleTo,
+                info.canRescaleMaxParallelism(maxToScaleTo));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java
new file mode 100644
index 0000000..8ce3fec
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerComputeReactiveModeVertexParallelismTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collections;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assume.assumeThat;
+
+/** Test vertex parallelism configuration for the {@link AdaptiveScheduler} in 
Reactive mode. */
+@RunWith(Parameterized.class)
+public class AdaptiveSchedulerComputeReactiveModeVertexParallelismTest extends 
TestLogger {
+    @Parameterized.Parameters(
+            name =
+                    "parallelism = {0}, maxParallelism = {1}, expected max = 
{2}, rescale to = {3}, can rescale = {4}")
+    public static Object[][] data() {
+        return new Object[][] {
+            // default minimum and rescale to higher
+            {1, JobVertex.MAX_PARALLELISM_DEFAULT, 128, 129, true},
+            // test round up part 1 and rescale to lower
+            {171, JobVertex.MAX_PARALLELISM_DEFAULT, 256, 42, false},
+            // test round up part 2 and rescale to equal
+            {172, JobVertex.MAX_PARALLELISM_DEFAULT, 512, 512, true},
+            // test round up limit and rescale to equal
+            {
+                UPPER_BOUND_MAX_PARALLELISM,
+                JobVertex.MAX_PARALLELISM_DEFAULT,
+                UPPER_BOUND_MAX_PARALLELISM,
+                UPPER_BOUND_MAX_PARALLELISM,
+                true
+            },
+            // test configured / takes precedence computed default and rescale 
to lower
+            {4, UPPER_BOUND_MAX_PARALLELISM, UPPER_BOUND_MAX_PARALLELISM, 3, 
false},
+            // test override takes precedence test configured 2 and rescale to 
higher
+            {4, 7, 7, UPPER_BOUND_MAX_PARALLELISM, true},
+        };
+    }
+
+    @Parameterized.Parameter(0)
+    public int parallelism;
+
+    @Parameterized.Parameter(1)
+    public int maxParallelism;
+
+    @Parameterized.Parameter(2)
+    public int expectedMaxParallelism;
+
+    @Parameterized.Parameter(3)
+    public int maxToScaleTo;
+
+    @Parameterized.Parameter(4)
+    public boolean expectedCanRescaleTo;
+
+    @Test
+    public void testCreateStoreWithoutAdjustedParallelism() {
+        assumeThat(
+                "max parallelism must be set",
+                maxParallelism,
+                is(not(JobVertex.MAX_PARALLELISM_DEFAULT)));
+
+        JobVertex jobVertex = createNoOpVertex("test", parallelism, 
maxParallelism);
+        VertexParallelismStore store =
+                AdaptiveScheduler.computeReactiveModeVertexParallelismStore(
+                        Collections.singleton(jobVertex), false);
+
+        VertexParallelismInformation info = 
store.getParallelismInfo(jobVertex.getID());
+
+        Assert.assertEquals("parallelism is not adjusted", parallelism, 
info.getParallelism());
+        Assert.assertEquals("expected max", expectedMaxParallelism, 
info.getMaxParallelism());
+
+        Assert.assertEquals(
+                "can rescale max",
+                expectedCanRescaleTo,
+                info.canRescaleMaxParallelism(maxToScaleTo));
+    }
+
+    @Test
+    public void testCreateStoreWithAdjustedParallelism() {
+        JobVertex jobVertex = createNoOpVertex("test", parallelism, 
maxParallelism);
+        VertexParallelismStore store =
+                AdaptiveScheduler.computeReactiveModeVertexParallelismStore(
+                        Collections.singleton(jobVertex), true);
+
+        VertexParallelismInformation info = 
store.getParallelismInfo(jobVertex.getID());
+
+        Assert.assertEquals(
+                "parallelism is adjusted to max", expectedMaxParallelism, 
info.getParallelism());
+        Assert.assertEquals("expected max", expectedMaxParallelism, 
info.getMaxParallelism());
+
+        Assert.assertEquals(
+                "can rescale max",
+                expectedCanRescaleTo,
+                info.canRescaleMaxParallelism(maxToScaleTo));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 0228427..8859dbb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -30,11 +30,11 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -53,11 +53,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
@@ -66,9 +63,11 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.ResourceCounter;
@@ -87,6 +86,8 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -96,8 +97,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup;
@@ -251,9 +254,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         final int numAvailableSlots = 1;
 
-        final OneShotLatch submitTaskLatch = new OneShotLatch();
-        final TaskManagerGateway taskManagerGateway =
-                
createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch);
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
 
         singleThreadMainThreadExecutor.execute(
                 () -> {
@@ -266,7 +268,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
                             taskManagerGateway);
                 });
 
-        submitTaskLatch.await();
+        // wait for all tasks to be submitted
+        taskManagerGateway.waitForSubmissions(numAvailableSlots, 
Duration.ofSeconds(5));
 
         final ArchivedExecutionGraph executionGraph =
                 CompletableFuture.supplyAsync(
@@ -287,15 +290,18 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
         final AdaptiveScheduler adaptiveScheduler =
                 new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
                         .setInitializationTimestamp(initializationTimestamp)
                         .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
                         .build();
 
-        final OneShotLatch submitTaskLatch = new OneShotLatch();
-        final TaskManagerGateway taskManagerGateway =
-                
createWaitingForTaskSubmissionTaskManagerGateway(submitTaskLatch);
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(PARALLELISM);
 
         singleThreadMainThreadExecutor.execute(
                 () -> {
@@ -308,7 +314,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
                             taskManagerGateway);
                 });
 
-        submitTaskLatch.await();
+        // Wait for just the first submission to indicate the execution graph 
is ready
+        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
 
         final ArchivedExecutionGraph executionGraph =
                 CompletableFuture.supplyAsync(
@@ -430,12 +437,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get();
 
-        final SimpleAckingTaskManagerGateway taskManagerGateway =
-                new SimpleAckingTaskManagerGateway();
-        final BlockingQueue<AllocationID> submittedTasks = new 
ArrayBlockingQueue<>(5);
-        taskManagerGateway.setSubmitConsumer(
-                taskDeploymentDescriptor ->
-                        
submittedTasks.offer(taskDeploymentDescriptor.getAllocationId()));
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
 
         taskManagerGateway.setCancelConsumer(
                 executionAttemptId ->
@@ -459,7 +462,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
                 });
 
         // wait for the first task submission
-        submittedTasks.take();
+        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
 
         assertThat(numRestartsMetric.getValue(), is(0));
 
@@ -474,8 +477,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
                             taskManagerGateway);
                 });
 
-        // wait for the second task submission
-        submittedTasks.take();
+        // wait for the second task submissions
+        taskManagerGateway.waitForSubmissions(PARALLELISM, 
Duration.ofSeconds(5));
 
         assertThat(numRestartsMetric.getValue(), is(1));
     }
@@ -495,7 +498,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testStartSchedulingSetsResourceRequirements() throws Exception 
{
+    public void testStartSchedulingSetsResourceRequirementsForDefaultMode() 
throws Exception {
         final JobGraph jobGraph = createJobGraph();
 
         final DefaultDeclarativeSlotPool declarativeSlotPool =
@@ -513,27 +516,78 @@ public class AdaptiveSchedulerTest extends TestLogger {
                 contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, 
PARALLELISM)));
     }
 
-    /** Tests that the listener for new slots is properly set up. */
     @Test
-    public void testResourceAcquisitionTriggersJobExecution() throws Exception 
{
+    public void testStartSchedulingSetsResourceRequirementsForReactiveMode() 
throws Exception {
         final JobGraph jobGraph = createJobGraph();
 
         final DefaultDeclarativeSlotPool declarativeSlotPool =
                 createDeclarativeSlotPool(jobGraph.getJobID());
 
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(jobGraph, mainThreadExecutor)
                         .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
                         .build();
 
         scheduler.startScheduling();
 
-        offerSlots(
-                declarativeSlotPool,
-                createSlotOffersForResourceRequirements(
-                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
PARALLELISM)));
+        // should request the max possible resources
+        final int expectedParallelism =
+                
KeyGroupRangeAssignment.computeDefaultMaxParallelism(PARALLELISM);
+        assertThat(
+                declarativeSlotPool.getResourceRequirements(),
+                contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, 
expectedParallelism)));
+    }
+
+    /** Tests that the listener for new slots is properly set up. */
+    @Test
+    public void testResourceAcquisitionTriggersJobExecution() throws Exception 
{
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
 
-        assertThat(scheduler.getState(), 
instanceOf(CreatingExecutionGraph.class));
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(PARALLELISM);
+
+        CompletableFuture<State> startingStateFuture = new 
CompletableFuture<>();
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+                    startingStateFuture.complete(scheduler.getState());
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                            taskManagerGateway);
+                });
+
+        assertThat(startingStateFuture.get(), 
instanceOf(WaitingForResources.class));
+
+        // Wait for all tasks to be submitted
+        taskManagerGateway.waitForSubmissions(PARALLELISM, 
Duration.ofSeconds(5));
+
+        final ArchivedExecutionGraph executionGraph =
+                CompletableFuture.supplyAsync(
+                                () -> 
scheduler.requestJob().getArchivedExecutionGraph(),
+                                singleThreadMainThreadExecutor)
+                        .get();
+
+        assertThat(
+                
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(), 
is(PARALLELISM));
     }
 
     @Test
@@ -752,22 +806,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
         scheduler.requestPartitionState(new IntermediateDataSetID(), new 
ResultPartitionID());
     }
 
-    @Nonnull
-    private TaskManagerGateway 
createWaitingForTaskSubmissionTaskManagerGateway(
-            OneShotLatch submitTaskLatch) {
-        final TaskManagerGateway taskManagerGateway =
-                SlotPoolTestUtils.createTaskManagerGateway(
-                        new TestingTaskExecutorGatewayBuilder()
-                                .setSubmitTaskConsumer(
-                                        (taskDeploymentDescriptor, 
jobMasterId) -> {
-                                            submitTaskLatch.trigger();
-                                            return 
CompletableFuture.completedFuture(
-                                                    Acknowledge.get());
-                                        })
-                                .createTestingTaskExecutorGateway());
-        return taskManagerGateway;
-    }
-
     @Test
     public void 
testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable()
             throws Exception {
@@ -791,6 +829,41 @@ public class AdaptiveSchedulerTest extends TestLogger {
         assertFalse(assignmentResult.isSuccess());
     }
 
+    @Test
+    public void testComputeVertexParallelismStoreForExecutionInReactiveMode() {
+        JobVertex v1 = createNoOpVertex("v1", 1, 50);
+        JobVertex v2 = createNoOpVertex("v2", 50, 50);
+        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
+
+        VertexParallelismStore parallelismStore =
+                AdaptiveScheduler.computeVertexParallelismStoreForExecution(
+                        graph, SchedulerExecutionMode.REACTIVE);
+
+        for (JobVertex vertex : graph.getVertices()) {
+            VertexParallelismInformation info = 
parallelismStore.getParallelismInfo(vertex.getID());
+
+            assertThat(info.getParallelism(), is(vertex.getParallelism()));
+            assertThat(info.getMaxParallelism(), 
is(vertex.getMaxParallelism()));
+        }
+    }
+
+    @Test
+    public void testComputeVertexParallelismStoreForExecutionInDefaultMode() {
+        JobVertex v1 = createNoOpVertex("v1", 1, 50);
+        JobVertex v2 = createNoOpVertex("v2", 50, 50);
+        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
+
+        VertexParallelismStore parallelismStore =
+                
AdaptiveScheduler.computeVertexParallelismStoreForExecution(graph, null);
+
+        for (JobVertex vertex : graph.getVertices()) {
+            VertexParallelismInformation info = 
parallelismStore.getParallelismInfo(vertex.getID());
+
+            assertThat(info.getParallelism(), is(vertex.getParallelism()));
+            assertThat(info.getMaxParallelism(), 
is(vertex.getMaxParallelism()));
+        }
+    }
+
     // 
---------------------------------------------------------------------------------------------
     // Utils
     // 
---------------------------------------------------------------------------------------------
@@ -823,18 +896,44 @@ public class AdaptiveSchedulerTest extends TestLogger {
             onLeaveCalled = true;
             onLeaveNewStateArgument = newState;
         }
+    }
 
-        private static class Factory implements 
StateFactory<LifecycleMethodCapturingState> {
+    /**
+     * A {@link SimpleAckingTaskManagerGateway} that buffers all the task 
submissions into a
+     * blocking queue, allowing one to wait for an arbitrary number of 
submissions.
+     */
+    private static class SubmissionBufferingTaskManagerGateway
+            extends SimpleAckingTaskManagerGateway {
+        final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;
 
-            @Override
-            public Class<LifecycleMethodCapturingState> getStateClass() {
-                return LifecycleMethodCapturingState.class;
-            }
+        public SubmissionBufferingTaskManagerGateway(int capacity) {
+            submittedTasks = new ArrayBlockingQueue<>(capacity);
+            super.setSubmitConsumer(submittedTasks::offer);
+        }
 
-            @Override
-            public LifecycleMethodCapturingState getState() {
-                return new LifecycleMethodCapturingState();
+        @Override
+        public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> 
submitConsumer) {
+            super.setSubmitConsumer(
+                    ((Consumer<TaskDeploymentDescriptor>) 
submittedTasks::offer)
+                            .andThen(submitConsumer));
+        }
+
+        /**
+         * Block until an arbitrary number of submissions have been received.
+         *
+         * @param numSubmissions The number of submissions to wait for
+         * @param perTaskTimeout The max amount of time to wait between each 
submission
+         * @return the list of the waited-for submissions
+         * @throws InterruptedException if a timeout is exceeded waiting for a 
submission
+         */
+        public List<TaskDeploymentDescriptor> waitForSubmissions(
+                int numSubmissions, Duration perTaskTimeout) throws 
InterruptedException {
+            List<TaskDeploymentDescriptor> descriptors = new ArrayList<>();
+            for (int i = 0; i < numSubmissions; i++) {
+                descriptors.add(
+                        submittedTasks.poll(perTaskTimeout.toMillis(), 
TimeUnit.MILLISECONDS));
             }
+            return descriptors;
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index b60f0e2..683b915 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.P
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
@@ -68,6 +69,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
@@ -694,6 +696,7 @@ public class ExecutingTest extends TestLogger {
                     1,
                     Time.milliseconds(1L),
                     1L,
+                    new DefaultVertexParallelismInfo(1, 1, max -> 
Optional.empty()),
                     new 
DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
             mockExecutionVertex = new MockExecutionVertex(this);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java
index 27f77cc..92419c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/topology/BuildExecutionGraphBenchmark.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
 
 import java.util.List;
@@ -43,8 +45,13 @@ public class BuildExecutionGraphBenchmark {
     public void setup(JobConfiguration jobConfiguration) throws Exception {
         jobVertices = createDefaultJobVertices(jobConfiguration);
         final JobGraph jobGraph = createJobGraph(jobConfiguration);
+        final VertexParallelismStore parallelismStore =
+                SchedulerBase.computeVertexParallelismStore(jobVertices);
         executionGraph =
-                
TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setVertexParallelismStore(parallelismStore)
+                        .setJobGraph(jobGraph)
+                        .build();
     }
 
     public void buildTopology() throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index afd69e6..1484d67 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuild
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -116,7 +117,11 @@ public class RescalePartitionerTest extends 
StreamPartitionerTest {
         assertEquals(4, mapVertex.getParallelism());
         assertEquals(2, sinkVertex.getParallelism());
 
-        ExecutionGraph eg = 
TestingDefaultExecutionGraphBuilder.newBuilder().build();
+        ExecutionGraph eg =
+                TestingDefaultExecutionGraphBuilder.newBuilder()
+                        .setVertexParallelismStore(
+                                
SchedulerBase.computeVertexParallelismStore(jobGraph))
+                        .build();
 
         try {
             eg.attachJobGraph(jobVertices);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index cc7b3c3..26de8ec 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -36,8 +36,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -234,7 +234,7 @@ public class RescalingITCase extends TestLogger {
             }
 
             int restoreMaxParallelism =
-                    deriveMaxParallelism ? ExecutionJobVertex.VALUE_NOT_SET : 
maxParallelism;
+                    deriveMaxParallelism ? JobVertex.MAX_PARALLELISM_DEFAULT : 
maxParallelism;
 
             JobGraph scaledJobGraph =
                     createJobGraphWithKeyedState(

Reply via email to