This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 416cb7aaa02c176e01485ff11ab4269f76b5e9e2
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Tue Feb 28 14:22:28 2023 +0000

    [FLINK-21450][runtime] Restructure types passed between AdaptiveScheduler 
and SlotAssigner
    
    Slot assignments are computed and consumed by SlotAllocator.
    This is expressed implicitly by extending VertexParallelism.
    
    This change tries to make that clear, while still allowing to assign
    slots to something other than Slot Sharing Groups.
    
    It does so by:
    1. Introduce JobSchedulingPlan, computed and consumed by SlotAllocator. It 
couples VertexParallelism with slot assignments
    2. Introduce determineParallelismAndCalculateAssignment method in addition 
to determineParallelism, specifically for assignments
    3. Push the polymorphism of state assignments from VertexParallelism into 
the JobSchedulingPlan (slot assignment target)
---
 .../jobgraph/jsonplan/JsonPlanGenerator.java       |  21 +---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  28 +++---
 .../scheduler/adaptive/CreatingExecutionGraph.java |  14 ++-
 .../scheduler/adaptive/JobSchedulingPlan.java      | 105 +++++++++++++++++++
 .../adaptive/allocator/SlotAllocator.java          |  16 ++-
 .../allocator/SlotSharingSlotAllocator.java        | 111 +++++++++------------
 .../adaptive/allocator/VertexParallelism.java      |  43 ++++++--
 .../VertexParallelismWithSlotSharing.java          |  52 ----------
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |   3 +-
 .../adaptive/CreatingExecutionGraphTest.java       |  29 ++----
 .../allocator/SlotSharingSlotAllocatorTest.java    |  62 ++++++------
 .../adaptive/allocator/TestingSlotAllocator.java   |  28 +++---
 12 files changed, 276 insertions(+), 236 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
index 8817c72282b..6cdcc77cc53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
@@ -33,27 +33,13 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator
 import org.apache.commons.text.StringEscapeUtils;
 
 import java.io.StringWriter;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 @Internal
 public class JsonPlanGenerator {
 
     private static final String NOT_SET = "";
     private static final String EMPTY = "{}";
-    private static final VertexParallelism EMPTY_VERTEX_PARALLELISM =
-            new VertexParallelism() {
-                @Override
-                public Map<JobVertexID, Integer> 
getMaxParallelismForVertices() {
-                    return Collections.emptyMap();
-                }
-
-                @Override
-                public int getParallelism(JobVertexID jobVertexId) {
-                    return -1;
-                }
-            };
 
     public static String generatePlan(JobGraph jg) {
         return generatePlan(
@@ -61,7 +47,7 @@ public class JsonPlanGenerator {
                 jg.getName(),
                 jg.getJobType(),
                 jg.getVertices(),
-                EMPTY_VERTEX_PARALLELISM);
+                VertexParallelism.empty());
     }
 
     public static String generatePlan(
@@ -116,11 +102,12 @@ public class JsonPlanGenerator {
 
                 // write the core properties
                 JobVertexID vertexID = vertex.getID();
-                int storeParallelism = 
vertexParallelism.getParallelism(vertexID);
                 gen.writeStringField("id", vertexID.toString());
                 gen.writeNumberField(
                         "parallelism",
-                        storeParallelism != -1 ? storeParallelism : 
vertex.getParallelism());
+                        vertexParallelism
+                                .getParallelismOptional(vertexID)
+                                .orElse(vertex.getParallelism()));
                 gen.writeStringField("operator", operator);
                 gen.writeStringField("operator_strategy", operatorDescr);
                 gen.writeStringField("description", description);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 0bdbb44da7c..a876bb65eb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -760,11 +760,12 @@ public class AdaptiveScheduler
                 .isPresent();
     }
 
-    private VertexParallelism determineParallelism(SlotAllocator slotAllocator)
+    private JobSchedulingPlan determineParallelism(SlotAllocator slotAllocator)
             throws NoResourceAvailableException {
 
         return slotAllocator
-                .determineParallelism(jobInformation, 
declarativeSlotPool.getFreeSlotsInformation())
+                .determineParallelismAndCalculateAssignment(
+                        jobInformation, 
declarativeSlotPool.getFreeSlotsInformation())
                 .orElseThrow(
                         () ->
                                 new NoResourceAvailableException(
@@ -932,11 +933,11 @@ public class AdaptiveScheduler
 
     private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
             createExecutionGraphWithAvailableResourcesAsync() {
-        final VertexParallelism vertexParallelism;
+        final JobSchedulingPlan schedulingPlan;
         final VertexParallelismStore adjustedParallelismStore;
 
         try {
-            vertexParallelism = determineParallelism(slotAllocator);
+            schedulingPlan = determineParallelism(slotAllocator);
             JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
 
             for (JobVertex vertex : adjustedJobGraph.getVertices()) {
@@ -944,7 +945,7 @@ public class AdaptiveScheduler
 
                 // use the determined "available parallelism" to use
                 // the resources we have access to
-                vertex.setParallelism(vertexParallelism.getParallelism(id));
+                
vertex.setParallelism(schedulingPlan.getVertexParallelism().getParallelism(id));
             }
 
             // use the originally configured max parallelism
@@ -966,7 +967,7 @@ public class AdaptiveScheduler
                 .thenApply(
                         executionGraph ->
                                 
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
-                                        executionGraph, vertexParallelism));
+                                        executionGraph, schedulingPlan));
     }
 
     @Override
@@ -982,10 +983,10 @@ public class AdaptiveScheduler
         executionGraph.setInternalTaskFailuresListener(
                 new UpdateSchedulerNgOnInternalFailuresListener(this));
 
-        final VertexParallelism vertexParallelism =
-                executionGraphWithVertexParallelism.getVertexParallelism();
+        final JobSchedulingPlan jobSchedulingPlan =
+                executionGraphWithVertexParallelism.getJobSchedulingPlan();
         return slotAllocator
-                .tryReserveResources(vertexParallelism)
+                .tryReserveResources(jobSchedulingPlan)
                 .map(
                         reservedSlots ->
                                 
CreatingExecutionGraph.AssignmentResult.success(
@@ -1050,14 +1051,14 @@ public class AdaptiveScheduler
         int availableSlots = 
declarativeSlotPool.getFreeSlotsInformation().size();
 
         if (availableSlots > 0) {
-            final Optional<? extends VertexParallelism> 
potentialNewParallelism =
+            final Optional<VertexParallelism> potentialNewParallelism =
                     slotAllocator.determineParallelism(
                             jobInformation, 
declarativeSlotPool.getAllSlotsInformation());
 
             if (potentialNewParallelism.isPresent()) {
                 int currentCumulativeParallelism = 
getCurrentCumulativeParallelism(executionGraph);
                 int newCumulativeParallelism =
-                        
getCumulativeParallelism(potentialNewParallelism.get());
+                        
potentialNewParallelism.get().getCumulativeParallelism();
                 if (newCumulativeParallelism > currentCumulativeParallelism) {
                     LOG.debug(
                             "Offering scale up to scale up controller with 
currentCumulativeParallelism={}, newCumulativeParallelism={}",
@@ -1077,11 +1078,6 @@ public class AdaptiveScheduler
                 .reduce(0, Integer::sum);
     }
 
-    private static int getCumulativeParallelism(VertexParallelism 
potentialNewParallelism) {
-        return 
potentialNewParallelism.getMaxParallelismForVertices().values().stream()
-                .reduce(0, Integer::sum);
-    }
-
     @Override
     public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index c87e58b2971..f19145f0844 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -339,16 +339,16 @@ public class CreatingExecutionGraph implements State {
     static class ExecutionGraphWithVertexParallelism {
         private final ExecutionGraph executionGraph;
 
-        private final VertexParallelism vertexParallelism;
+        private final JobSchedulingPlan jobSchedulingPlan;
 
         private ExecutionGraphWithVertexParallelism(
-                ExecutionGraph executionGraph, VertexParallelism 
vertexParallelism) {
+                ExecutionGraph executionGraph, JobSchedulingPlan 
jobSchedulingPlan) {
             this.executionGraph = executionGraph;
-            this.vertexParallelism = vertexParallelism;
+            this.jobSchedulingPlan = jobSchedulingPlan;
         }
 
         public static ExecutionGraphWithVertexParallelism create(
-                ExecutionGraph executionGraph, VertexParallelism 
vertexParallelism) {
+                ExecutionGraph executionGraph, JobSchedulingPlan 
vertexParallelism) {
             return new ExecutionGraphWithVertexParallelism(executionGraph, 
vertexParallelism);
         }
 
@@ -357,7 +357,11 @@ public class CreatingExecutionGraph implements State {
         }
 
         public VertexParallelism getVertexParallelism() {
-            return vertexParallelism;
+            return jobSchedulingPlan.getVertexParallelism();
+        }
+
+        public JobSchedulingPlan getJobSchedulingPlan() {
+            return jobSchedulingPlan;
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java
new file mode 100644
index 00000000000..cde53948d16
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A plan that describes how to execute {@link 
org.apache.flink.runtime.jobgraph.JobGraph JobGraph}.
+ *
+ * <ol>
+ *   <li>{@link #vertexParallelism} is necessary to create {@link
+ *       org.apache.flink.runtime.executiongraph.ExecutionGraph ExecutionGraph}
+ *   <li>{@link #slotAssignments} are used to schedule it onto the cluster
+ * </ol>
+ *
+ * {@link AdaptiveScheduler} passes this structure from {@link 
WaitingForResources} to {@link
+ * CreatingExecutionGraph} stages.
+ */
+@Internal
+public class JobSchedulingPlan {
+    private final VertexParallelism vertexParallelism;
+    private final Collection<SlotAssignment> slotAssignments;
+
+    public JobSchedulingPlan(
+            VertexParallelism vertexParallelism, Collection<SlotAssignment> 
slotAssignments) {
+        this.vertexParallelism = vertexParallelism;
+        this.slotAssignments = slotAssignments;
+    }
+
+    public VertexParallelism getVertexParallelism() {
+        return vertexParallelism;
+    }
+
+    public Collection<SlotAssignment> getSlotAssignments() {
+        return slotAssignments;
+    }
+
+    /**
+     * Create an empty {@link JobSchedulingPlan} with no information about 
vertices or allocations.
+     */
+    public static JobSchedulingPlan empty() {
+        return new JobSchedulingPlan(VertexParallelism.empty(), 
Collections.emptyList());
+    }
+
+    /** Assignment of a slot to some target (e.g. a slot sharing group). */
+    public static class SlotAssignment {
+        private final SlotInfo slotInfo;
+        /**
+         * Interpreted by {@link
+         * 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator#tryReserveResources(JobSchedulingPlan)}.
+         * This can be a slot sharing group, a task, or something else.
+         */
+        private final Object target;
+
+        public SlotAssignment(SlotInfo slotInfo, Object target) {
+            this.slotInfo = slotInfo;
+            this.target = target;
+        }
+
+        public SlotInfo getSlotInfo() {
+            return slotInfo;
+        }
+
+        public Object getTarget() {
+            return target;
+        }
+
+        public <T> T getTargetAs(Class<T> clazz) {
+            return (T) getTarget();
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "SlotAssignment: %s, target: %s", 
slotInfo.getAllocationId(), target);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "JobSchedulingPlan: parallelism: %s, assignments: %s",
+                vertexParallelism, slotAssignments);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
index 9112b52e876..fea0d659ad3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import org.apache.flink.runtime.util.ResourceCounter;
 
 import java.util.Collection;
@@ -43,7 +44,7 @@ public interface SlotAllocator {
      * given job information.
      *
      * <p>Implementations of this method must be side-effect free. There is no 
guarantee that the
-     * result of this method is ever passed to {@link 
#tryReserveResources(VertexParallelism)}.
+     * result of this method is ever passed to {@link 
#tryReserveResources(JobSchedulingPlan)}.
      *
      * @param jobInformation information about the job graph
      * @param slots slots to consider for determining the parallelism
@@ -51,7 +52,14 @@ public interface SlotAllocator {
      *     how the vertices could be assigned to slots, if all vertices could 
be run with the given
      *     slots
      */
-    Optional<? extends VertexParallelism> determineParallelism(
+    Optional<VertexParallelism> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
+
+    /**
+     * Same as {@link #determineParallelism(JobInformation, Collection)} but 
additionally determine
+     * assignment of slots to execution slot sharing groups.
+     */
+    Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
 
     /**
@@ -59,9 +67,9 @@ public interface SlotAllocator {
      * resources has changed and the reservation with respect to 
vertexParallelism is no longer
      * possible, then this method returns {@link Optional#empty()}.
      *
-     * @param vertexParallelism information on how slots should be assigned to 
the slots
+     * @param jobSchedulingPlan information on how slots should be assigned to 
the slots
      * @return Set of reserved slots if the reservation was successful; 
otherwise {@link
      *     Optional#empty()}
      */
-    Optional<ReservedSlots> tryReserveResources(VertexParallelism 
vertexParallelism);
+    Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan 
jobSchedulingPlan);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index ce4c6bfb1af..2b7a9c362ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -26,9 +26,10 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
@@ -93,7 +94,7 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     }
 
     @Override
-    public Optional<VertexParallelismWithSlotSharing> determineParallelism(
+    public Optional<VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
 
         // => less slots than slot-sharing groups
@@ -104,9 +105,6 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
                 determineSlotsPerSharingGroup(jobInformation, 
freeSlots.size());
 
-        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
-
-        final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new 
ArrayList<>();
         final Map<JobVertexID, Integer> allVertexParallelism = new HashMap<>();
 
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
@@ -120,21 +118,37 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
                             containedJobVertices,
                             slotSharingGroupParallelism.get(
                                     slotSharingGroup.getSlotSharingGroupId()));
+            allVertexParallelism.putAll(vertexParallelism);
+        }
+        return Optional.of(new VertexParallelism(allVertexParallelism));
+    }
 
-            final Iterable<ExecutionSlotSharingGroup> 
sharedSlotToVertexAssignment =
-                    createExecutionSlotSharingGroups(vertexParallelism);
-
-            for (ExecutionSlotSharingGroup executionSlotSharingGroup :
-                    sharedSlotToVertexAssignment) {
-                final SlotInfo slotInfo = slotIterator.next();
+    @Override
+    public Optional<JobSchedulingPlan> 
determineParallelismAndCalculateAssignment(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
+        return determineParallelism(jobInformation, slots)
+                .map(
+                        parallelism ->
+                                new JobSchedulingPlan(
+                                        parallelism,
+                                        assignSlots(jobInformation, slots, 
parallelism)));
+    }
 
-                assignments.add(
-                        new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
-            }
-            allVertexParallelism.putAll(vertexParallelism);
+    private Collection<SlotAssignment> assignSlots(
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> freeSlots,
+            VertexParallelism vertexParallelism) {
+        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        return Optional.of(new 
VertexParallelismWithSlotSharing(allVertexParallelism, assignments));
+        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        Collection<SlotAssignment> assignments = new ArrayList<>();
+        for (ExecutionSlotSharingGroup group : allGroups) {
+            assignments.add(new SlotAssignment(iterator.next(), group));
+        }
+        return assignments;
     }
 
     /**
@@ -186,15 +200,16 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         return vertexParallelism;
     }
 
-    private static Iterable<ExecutionSlotSharingGroup> 
createExecutionSlotSharingGroups(
-            Map<JobVertexID, Integer> containedJobVertices) {
+    private static List<ExecutionSlotSharingGroup> 
createExecutionSlotSharingGroups(
+            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
         final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
 
-        for (Map.Entry<JobVertexID, Integer> jobVertex : 
containedJobVertices.entrySet()) {
-            for (int i = 0; i < jobVertex.getValue(); i++) {
+        for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) {
+            int parallelism = vertexParallelism.getParallelism(jobVertexId);
+            for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) {
                 sharedSlotToVertexAssignment
-                        .computeIfAbsent(i, ignored -> new HashSet<>())
-                        .add(new ExecutionVertexID(jobVertex.getKey(), i));
+                        .computeIfAbsent(subtaskIdx, ignored -> new 
HashSet<>())
+                        .add(new ExecutionVertexID(jobVertexId, subtaskIdx));
             }
         }
 
@@ -204,34 +219,20 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     }
 
     @Override
-    public Optional<ReservedSlots> tryReserveResources(VertexParallelism 
vertexParallelism) {
-        Preconditions.checkArgument(
-                vertexParallelism instanceof VertexParallelismWithSlotSharing,
-                String.format(
-                        "%s expects %s as argument.",
-                        SlotSharingSlotAllocator.class.getSimpleName(),
-                        
VertexParallelismWithSlotSharing.class.getSimpleName()));
-
-        final VertexParallelismWithSlotSharing 
vertexParallelismWithSlotSharing =
-                (VertexParallelismWithSlotSharing) vertexParallelism;
-
+    public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan 
jobSchedulingPlan) {
         final Collection<AllocationID> expectedSlots =
-                
calculateExpectedSlots(vertexParallelismWithSlotSharing.getAssignments());
+                calculateExpectedSlots(jobSchedulingPlan.getSlotAssignments());
 
         if (areAllExpectedSlotsAvailableAndFree(expectedSlots)) {
             final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new 
HashMap<>();
 
-            for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
-                    vertexParallelismWithSlotSharing.getAssignments()) {
-                final SharedSlot sharedSlot =
-                        
reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
-
+            for (SlotAssignment assignment : 
jobSchedulingPlan.getSlotAssignments()) {
+                final SharedSlot sharedSlot = 
reserveSharedSlot(assignment.getSlotInfo());
                 for (ExecutionVertexID executionVertexId :
-                        executionSlotSharingGroup
-                                .getExecutionSlotSharingGroup()
+                        assignment
+                                .getTargetAs(ExecutionSlotSharingGroup.class)
                                 .getContainedExecutionVertices()) {
-                    final LogicalSlot logicalSlot = 
sharedSlot.allocateLogicalSlot();
-                    assignedSlots.put(executionVertexId, logicalSlot);
+                    assignedSlots.put(executionVertexId, 
sharedSlot.allocateLogicalSlot());
                 }
             }
 
@@ -242,11 +243,10 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     }
 
     @Nonnull
-    private Collection<AllocationID> calculateExpectedSlots(
-            Iterable<? extends ExecutionSlotSharingGroupAndSlot> assignments) {
+    private Collection<AllocationID> 
calculateExpectedSlots(Iterable<SlotAssignment> assignments) {
         final Collection<AllocationID> requiredSlots = new ArrayList<>();
 
-        for (ExecutionSlotSharingGroupAndSlot assignment : assignments) {
+        for (SlotAssignment assignment : assignments) {
             requiredSlots.add(assignment.getSlotInfo().getAllocationId());
         }
         return requiredSlots;
@@ -288,23 +288,4 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
             return containedExecutionVertices;
         }
     }
-
-    static class ExecutionSlotSharingGroupAndSlot {
-        private final ExecutionSlotSharingGroup executionSlotSharingGroup;
-        private final SlotInfo slotInfo;
-
-        public ExecutionSlotSharingGroupAndSlot(
-                ExecutionSlotSharingGroup executionSlotSharingGroup, SlotInfo 
slotInfo) {
-            this.executionSlotSharingGroup = executionSlotSharingGroup;
-            this.slotInfo = slotInfo;
-        }
-
-        public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
-            return executionSlotSharingGroup;
-        }
-
-        public SlotInfo getSlotInfo() {
-            return slotInfo;
-        }
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
index 0a19a3217ed..dcc3c10fd18 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
@@ -20,17 +20,44 @@ package 
org.apache.flink.runtime.scheduler.adaptive.allocator;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)}, describing
- * the parallelism each vertex could be scheduled with.
- *
- * <p>{@link SlotAllocator} implementations may encode additional information 
to be used in {@link
- * SlotAllocator#tryReserveResources(VertexParallelism)}.
+ * Core result of {@link SlotAllocator#determineParallelism(JobInformation, 
Collection)} among with
+ * {@link 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment
+ * slotAssignments}, describing the parallelism each vertex could be scheduled 
with.
  */
-public interface VertexParallelism {
-    Map<JobVertexID, Integer> getMaxParallelismForVertices();
+public class VertexParallelism {
+    private final Map<JobVertexID, Integer> parallelismForVertices;
+
+    public VertexParallelism(Map<JobVertexID, Integer> parallelismForVertices) 
{
+        this.parallelismForVertices = parallelismForVertices;
+    }
+
+    public int getParallelism(JobVertexID jobVertexId) {
+        checkArgument(
+                parallelismForVertices.containsKey(jobVertexId), "Unknown 
vertex: " + jobVertexId);
+        return parallelismForVertices.get(jobVertexId);
+    }
+
+    public Optional<Integer> getParallelismOptional(JobVertexID jobVertexId) {
+        return Optional.ofNullable(parallelismForVertices.get(jobVertexId));
+    }
+
+    public int getCumulativeParallelism() {
+        return parallelismForVertices.values().stream().reduce(0, 
Integer::sum);
+    }
+
+    @Override
+    public String toString() {
+        return "VertexParallelism: " + parallelismForVertices;
+    }
 
-    int getParallelism(JobVertexID jobVertexId);
+    public static VertexParallelism empty() {
+        return new VertexParallelism(Collections.emptyMap());
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java
deleted file mode 100644
index 05bcadf2728..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive.allocator;
-
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-import java.util.Map;
-
-/** {@link VertexParallelism} implementation for the {@link 
SlotSharingSlotAllocator}. */
-public class VertexParallelismWithSlotSharing implements VertexParallelism {
-
-    private final Map<JobVertexID, Integer> vertexParallelism;
-    private final 
Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> 
assignments;
-
-    VertexParallelismWithSlotSharing(
-            Map<JobVertexID, Integer> vertexParallelism,
-            
Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> 
assignments) {
-        this.vertexParallelism = vertexParallelism;
-        this.assignments = Preconditions.checkNotNull(assignments);
-    }
-
-    Iterable<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> 
getAssignments() {
-        return assignments;
-    }
-
-    @Override
-    public Map<JobVertexID, Integer> getMaxParallelismForVertices() {
-        return vertexParallelism;
-    }
-
-    @Override
-    public int getParallelism(JobVertexID jobVertexId) {
-        return vertexParallelism.get(jobVertexId);
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index e6d457a8ad7..978750ecf3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -1408,8 +1408,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final CreatingExecutionGraph.AssignmentResult assignmentResult =
                 adaptiveScheduler.tryToAssignSlots(
                         
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
-                                new StateTrackingMockExecutionGraph(),
-                                new 
CreatingExecutionGraphTest.TestingVertexParallelism()));
+                                new StateTrackingMockExecutionGraph(), 
JobSchedulingPlan.empty()));
 
         assertThat(assignmentResult.isSuccess()).isFalse();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index 85a4fe4f3ab..54ebdc11702 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -25,13 +25,11 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
-import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -45,7 +43,6 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
@@ -158,8 +155,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
             context.setExpectWaitingForResources();
 
             executionGraphWithVertexParallelismFuture.complete(
-                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
-                            new StateTrackingMockExecutionGraph(), new 
TestingVertexParallelism()));
+                    getGraph(new StateTrackingMockExecutionGraph()));
         }
     }
 
@@ -183,9 +179,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
                     actualExecutionGraph ->
                             
assertThat(actualExecutionGraph).isEqualTo(executionGraph));
 
-            executionGraphWithVertexParallelismFuture.complete(
-                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
-                            executionGraph, new TestingVertexParallelism()));
+            
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
         }
     }
 
@@ -214,9 +208,7 @@ public class CreatingExecutionGraphTest extends TestLogger {
                     actualExecutionGraph ->
                             
assertThat(actualExecutionGraph).isEqualTo(executionGraph));
 
-            executionGraphWithVertexParallelismFuture.complete(
-                    
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
-                            executionGraph, new TestingVertexParallelism()));
+            
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
 
             
assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context);
         }
@@ -353,16 +345,9 @@ public class CreatingExecutionGraphTest extends TestLogger 
{
         }
     }
 
-    static final class TestingVertexParallelism implements VertexParallelism {
-
-        @Override
-        public Map<JobVertexID, Integer> getMaxParallelismForVertices() {
-            throw new UnsupportedOperationException("Is not supported");
-        }
-
-        @Override
-        public int getParallelism(JobVertexID jobVertexId) {
-            throw new UnsupportedOperationException("Is not supported");
-        }
+    private static CreatingExecutionGraph.ExecutionGraphWithVertexParallelism 
getGraph(
+            StateTrackingMockExecutionGraph executionGraph) {
+        return 
CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(
+                executionGraph, JobSchedulingPlan.empty());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 8be500919f1..7643acc7410 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.TestLogger;
@@ -96,15 +98,12 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
 
-        final VertexParallelism slotSharingAssignments =
+        final VertexParallelism vertexParallelism =
                 slotAllocator.determineParallelism(jobInformation, 
getSlots(2)).get();
 
-        final Map<JobVertexID, Integer> maxParallelismForVertices =
-                slotSharingAssignments.getMaxParallelismForVertices();
-
-        assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), 
is(1));
-        assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), 
is(1));
-        assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), 
is(1));
+        assertThat(vertexParallelism.getParallelism(vertex1.getJobVertexID()), 
is(1));
+        assertThat(vertexParallelism.getParallelism(vertex2.getJobVertexID()), 
is(1));
+        assertThat(vertexParallelism.getParallelism(vertex3.getJobVertexID()), 
is(1));
     }
 
     @Test
@@ -118,20 +117,17 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
 
-        final VertexParallelism slotSharingAssignments =
+        final VertexParallelism vertexParallelism =
                 slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
 
-        final Map<JobVertexID, Integer> maxParallelismForVertices =
-                slotSharingAssignments.getMaxParallelismForVertices();
-
         assertThat(
-                maxParallelismForVertices.get(vertex1.getJobVertexID()),
+                vertexParallelism.getParallelism(vertex1.getJobVertexID()),
                 is(vertex1.getParallelism()));
         assertThat(
-                maxParallelismForVertices.get(vertex2.getJobVertexID()),
+                vertexParallelism.getParallelism(vertex2.getJobVertexID()),
                 is(vertex2.getParallelism()));
         assertThat(
-                maxParallelismForVertices.get(vertex3.getJobVertexID()),
+                vertexParallelism.getParallelism(vertex3.getJobVertexID()),
                 is(vertex3.getParallelism()));
     }
 
@@ -153,19 +149,18 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         TestJobInformation testJobInformation =
                 new TestJobInformation(Arrays.asList(vertex11, vertex12, 
vertex2));
 
-        Map<JobVertexID, Integer> maxParallelismForVertices =
+        VertexParallelism vertexParallelism =
                 slotAllocator
                         .determineParallelism(
                                 testJobInformation,
                                 getSlots(vertex11.getParallelism() + 
vertex2.getParallelism()))
-                        .get()
-                        .getMaxParallelismForVertices();
+                        .get();
 
-        
Assertions.assertThat(maxParallelismForVertices.get(vertex11.getJobVertexID()))
+        
Assertions.assertThat(vertexParallelism.getParallelism(vertex11.getJobVertexID()))
                 .isEqualTo(vertex11.getParallelism());
-        
Assertions.assertThat(maxParallelismForVertices.get(vertex12.getJobVertexID()))
+        
Assertions.assertThat(vertexParallelism.getParallelism(vertex12.getJobVertexID()))
                 .isEqualTo(vertex12.getParallelism());
-        
Assertions.assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()))
+        
Assertions.assertThat(vertexParallelism.getParallelism(vertex2.getJobVertexID()))
                 .isEqualTo(vertex2.getParallelism());
     }
 
@@ -180,10 +175,10 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
 
-        final Optional<VertexParallelismWithSlotSharing> 
slotSharingAssignments =
+        final Optional<VertexParallelism> vertexParallelism =
                 slotAllocator.determineParallelism(jobInformation, 
getSlots(1));
 
-        assertThat(slotSharingAssignments.isPresent(), is(false));
+        assertThat(vertexParallelism.isPresent(), is(false));
     }
 
     @Test
@@ -197,20 +192,23 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
 
-        final VertexParallelismWithSlotSharing slotAssignments =
-                slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+        final JobSchedulingPlan jobSchedulingPlan =
+                slotAllocator
+                        
.determineParallelismAndCalculateAssignment(jobInformation, getSlots(50))
+                        .get();
 
         final ReservedSlots reservedSlots =
                 slotAllocator
-                        .tryReserveResources(slotAssignments)
+                        .tryReserveResources(jobSchedulingPlan)
                         .orElseThrow(
                                 () -> new RuntimeException("Expected that 
reservation succeeds."));
 
         final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new 
HashMap<>();
-        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot 
assignment :
-                slotAssignments.getAssignments()) {
+        for (JobSchedulingPlan.SlotAssignment assignment : 
jobSchedulingPlan.getSlotAssignments()) {
+            ExecutionSlotSharingGroup target =
+                    assignment.getTargetAs(ExecutionSlotSharingGroup.class);
             for (ExecutionVertexID containedExecutionVertex :
-                    
assignment.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
+                    target.getContainedExecutionVertices()) {
                 expectedAssignments.put(containedExecutionVertex, 
assignment.getSlotInfo());
             }
         }
@@ -234,11 +232,13 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         final JobInformation jobInformation =
                 new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
 
-        final VertexParallelismWithSlotSharing slotAssignments =
-                slotSharingSlotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+        JobSchedulingPlan jobSchedulingPlan =
+                slotSharingSlotAllocator
+                        
.determineParallelismAndCalculateAssignment(jobInformation, getSlots(50))
+                        .get();
 
         final Optional<? extends ReservedSlots> reservedSlots =
-                slotSharingSlotAllocator.tryReserveResources(slotAssignments);
+                
slotSharingSlotAllocator.tryReserveResources(jobSchedulingPlan);
 
         assertFalse(reservedSlots.isPresent());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
index aa6fd7bad8d..44b73122ae9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import org.apache.flink.runtime.util.ResourceCounter;
 
 import java.util.Collection;
@@ -33,9 +34,7 @@ public class TestingSlotAllocator implements SlotAllocator {
             calculateRequiredSlotsFunction;
 
     private final BiFunction<
-                    JobInformation,
-                    Collection<? extends SlotInfo>,
-                    Optional<? extends VertexParallelism>>
+                    JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
             determineParallelismFunction;
 
     private final Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction;
@@ -43,10 +42,7 @@ public class TestingSlotAllocator implements SlotAllocator {
     private TestingSlotAllocator(
             Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
                     calculateRequiredSlotsFunction,
-            BiFunction<
-                            JobInformation,
-                            Collection<? extends SlotInfo>,
-                            Optional<? extends VertexParallelism>>
+            BiFunction<JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
                     determineParallelismFunction,
             Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
         this.calculateRequiredSlotsFunction = calculateRequiredSlotsFunction;
@@ -61,14 +57,20 @@ public class TestingSlotAllocator implements SlotAllocator {
     }
 
     @Override
-    public Optional<? extends VertexParallelism> determineParallelism(
+    public Optional<VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
         return determineParallelismFunction.apply(jobInformation, slots);
     }
 
     @Override
-    public Optional<ReservedSlots> tryReserveResources(VertexParallelism 
vertexParallelism) {
-        return tryReserveResourcesFunction.apply(vertexParallelism);
+    public Optional<JobSchedulingPlan> 
determineParallelismAndCalculateAssignment(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan 
jobSchedulingPlan) {
+        return 
tryReserveResourcesFunction.apply(jobSchedulingPlan.getVertexParallelism());
     }
 
     public static Builder newBuilder() {
@@ -80,9 +82,7 @@ public class TestingSlotAllocator implements SlotAllocator {
         private Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
                 calculateRequiredSlotsFunction = ignored -> 
ResourceCounter.empty();
         private BiFunction<
-                        JobInformation,
-                        Collection<? extends SlotInfo>,
-                        Optional<? extends VertexParallelism>>
+                        JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
                 determineParallelismFunction = (ignoredA, ignoredB) -> 
Optional.empty();
         private Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction =
                 ignored -> Optional.empty();
@@ -98,7 +98,7 @@ public class TestingSlotAllocator implements SlotAllocator {
                 BiFunction<
                                 JobInformation,
                                 Collection<? extends SlotInfo>,
-                                Optional<? extends VertexParallelism>>
+                                Optional<VertexParallelism>>
                         determineParallelismFunction) {
             this.determineParallelismFunction = determineParallelismFunction;
             return this;

Reply via email to