This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 416cb7aaa02c176e01485ff11ab4269f76b5e9e2 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Tue Feb 28 14:22:28 2023 +0000 [FLINK-21450][runtime] Restructure types passed between AdaptiveScheduler and SlotAssigner Slot assignments are computed and consumed by SlotAllocator. This is expressed implicitly by extending VertexParallelism. This change tries to make that clear, while still allowing to assign slots to something other than Slot Sharing Groups. It does so by: 1. Introduce JobSchedulingPlan, computed and consumed by SlotAllocator. It couples VertexParallelism with slot assignments 2. Introduce determineParallelismAndCalculateAssignment method in addition to determineParallelism, specifically for assignments 3. Push the polymorphism of state assignments from VertexParallelism into the JobSchedulingPlan (slot assignment target) --- .../jobgraph/jsonplan/JsonPlanGenerator.java | 21 +--- .../scheduler/adaptive/AdaptiveScheduler.java | 28 +++--- .../scheduler/adaptive/CreatingExecutionGraph.java | 14 ++- .../scheduler/adaptive/JobSchedulingPlan.java | 105 +++++++++++++++++++ .../adaptive/allocator/SlotAllocator.java | 16 ++- .../allocator/SlotSharingSlotAllocator.java | 111 +++++++++------------ .../adaptive/allocator/VertexParallelism.java | 43 ++++++-- .../VertexParallelismWithSlotSharing.java | 52 ---------- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 3 +- .../adaptive/CreatingExecutionGraphTest.java | 29 ++---- .../allocator/SlotSharingSlotAllocatorTest.java | 62 ++++++------ .../adaptive/allocator/TestingSlotAllocator.java | 28 +++--- 12 files changed, 276 insertions(+), 236 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java index 8817c72282b..6cdcc77cc53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java @@ -33,27 +33,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator import org.apache.commons.text.StringEscapeUtils; import java.io.StringWriter; -import java.util.Collections; import java.util.List; -import java.util.Map; @Internal public class JsonPlanGenerator { private static final String NOT_SET = ""; private static final String EMPTY = "{}"; - private static final VertexParallelism EMPTY_VERTEX_PARALLELISM = - new VertexParallelism() { - @Override - public Map<JobVertexID, Integer> getMaxParallelismForVertices() { - return Collections.emptyMap(); - } - - @Override - public int getParallelism(JobVertexID jobVertexId) { - return -1; - } - }; public static String generatePlan(JobGraph jg) { return generatePlan( @@ -61,7 +47,7 @@ public class JsonPlanGenerator { jg.getName(), jg.getJobType(), jg.getVertices(), - EMPTY_VERTEX_PARALLELISM); + VertexParallelism.empty()); } public static String generatePlan( @@ -116,11 +102,12 @@ public class JsonPlanGenerator { // write the core properties JobVertexID vertexID = vertex.getID(); - int storeParallelism = vertexParallelism.getParallelism(vertexID); gen.writeStringField("id", vertexID.toString()); gen.writeNumberField( "parallelism", - storeParallelism != -1 ? storeParallelism : vertex.getParallelism()); + vertexParallelism + .getParallelismOptional(vertexID) + .orElse(vertex.getParallelism())); gen.writeStringField("operator", operator); gen.writeStringField("operator_strategy", operatorDescr); gen.writeStringField("description", description); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 0bdbb44da7c..a876bb65eb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -760,11 +760,12 @@ public class AdaptiveScheduler .isPresent(); } - private VertexParallelism determineParallelism(SlotAllocator slotAllocator) + private JobSchedulingPlan determineParallelism(SlotAllocator slotAllocator) throws NoResourceAvailableException { return slotAllocator - .determineParallelism(jobInformation, declarativeSlotPool.getFreeSlotsInformation()) + .determineParallelismAndCalculateAssignment( + jobInformation, declarativeSlotPool.getFreeSlotsInformation()) .orElseThrow( () -> new NoResourceAvailableException( @@ -932,11 +933,11 @@ public class AdaptiveScheduler private CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> createExecutionGraphWithAvailableResourcesAsync() { - final VertexParallelism vertexParallelism; + final JobSchedulingPlan schedulingPlan; final VertexParallelismStore adjustedParallelismStore; try { - vertexParallelism = determineParallelism(slotAllocator); + schedulingPlan = determineParallelism(slotAllocator); JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); for (JobVertex vertex : adjustedJobGraph.getVertices()) { @@ -944,7 +945,7 @@ public class AdaptiveScheduler // use the determined "available parallelism" to use // the resources we have access to - vertex.setParallelism(vertexParallelism.getParallelism(id)); + vertex.setParallelism(schedulingPlan.getVertexParallelism().getParallelism(id)); } // use the originally configured max parallelism @@ -966,7 +967,7 @@ public class AdaptiveScheduler .thenApply( executionGraph -> CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( - executionGraph, vertexParallelism)); + executionGraph, schedulingPlan)); } @Override @@ -982,10 +983,10 @@ public class AdaptiveScheduler executionGraph.setInternalTaskFailuresListener( new UpdateSchedulerNgOnInternalFailuresListener(this)); - final VertexParallelism vertexParallelism = - executionGraphWithVertexParallelism.getVertexParallelism(); + final JobSchedulingPlan jobSchedulingPlan = + executionGraphWithVertexParallelism.getJobSchedulingPlan(); return slotAllocator - .tryReserveResources(vertexParallelism) + .tryReserveResources(jobSchedulingPlan) .map( reservedSlots -> CreatingExecutionGraph.AssignmentResult.success( @@ -1050,14 +1051,14 @@ public class AdaptiveScheduler int availableSlots = declarativeSlotPool.getFreeSlotsInformation().size(); if (availableSlots > 0) { - final Optional<? extends VertexParallelism> potentialNewParallelism = + final Optional<VertexParallelism> potentialNewParallelism = slotAllocator.determineParallelism( jobInformation, declarativeSlotPool.getAllSlotsInformation()); if (potentialNewParallelism.isPresent()) { int currentCumulativeParallelism = getCurrentCumulativeParallelism(executionGraph); int newCumulativeParallelism = - getCumulativeParallelism(potentialNewParallelism.get()); + potentialNewParallelism.get().getCumulativeParallelism(); if (newCumulativeParallelism > currentCumulativeParallelism) { LOG.debug( "Offering scale up to scale up controller with currentCumulativeParallelism={}, newCumulativeParallelism={}", @@ -1077,11 +1078,6 @@ public class AdaptiveScheduler .reduce(0, Integer::sum); } - private static int getCumulativeParallelism(VertexParallelism potentialNewParallelism) { - return potentialNewParallelism.getMaxParallelismForVertices().values().stream() - .reduce(0, Integer::sum); - } - @Override public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index c87e58b2971..f19145f0844 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -339,16 +339,16 @@ public class CreatingExecutionGraph implements State { static class ExecutionGraphWithVertexParallelism { private final ExecutionGraph executionGraph; - private final VertexParallelism vertexParallelism; + private final JobSchedulingPlan jobSchedulingPlan; private ExecutionGraphWithVertexParallelism( - ExecutionGraph executionGraph, VertexParallelism vertexParallelism) { + ExecutionGraph executionGraph, JobSchedulingPlan jobSchedulingPlan) { this.executionGraph = executionGraph; - this.vertexParallelism = vertexParallelism; + this.jobSchedulingPlan = jobSchedulingPlan; } public static ExecutionGraphWithVertexParallelism create( - ExecutionGraph executionGraph, VertexParallelism vertexParallelism) { + ExecutionGraph executionGraph, JobSchedulingPlan vertexParallelism) { return new ExecutionGraphWithVertexParallelism(executionGraph, vertexParallelism); } @@ -357,7 +357,11 @@ public class CreatingExecutionGraph implements State { } public VertexParallelism getVertexParallelism() { - return vertexParallelism; + return jobSchedulingPlan.getVertexParallelism(); + } + + public JobSchedulingPlan getJobSchedulingPlan() { + return jobSchedulingPlan; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java new file mode 100644 index 00000000000..cde53948d16 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; + +import java.util.Collection; +import java.util.Collections; + +/** + * A plan that describes how to execute {@link org.apache.flink.runtime.jobgraph.JobGraph JobGraph}. + * + * <ol> + * <li>{@link #vertexParallelism} is necessary to create {@link + * org.apache.flink.runtime.executiongraph.ExecutionGraph ExecutionGraph} + * <li>{@link #slotAssignments} are used to schedule it onto the cluster + * </ol> + * + * {@link AdaptiveScheduler} passes this structure from {@link WaitingForResources} to {@link + * CreatingExecutionGraph} stages. + */ +@Internal +public class JobSchedulingPlan { + private final VertexParallelism vertexParallelism; + private final Collection<SlotAssignment> slotAssignments; + + public JobSchedulingPlan( + VertexParallelism vertexParallelism, Collection<SlotAssignment> slotAssignments) { + this.vertexParallelism = vertexParallelism; + this.slotAssignments = slotAssignments; + } + + public VertexParallelism getVertexParallelism() { + return vertexParallelism; + } + + public Collection<SlotAssignment> getSlotAssignments() { + return slotAssignments; + } + + /** + * Create an empty {@link JobSchedulingPlan} with no information about vertices or allocations. + */ + public static JobSchedulingPlan empty() { + return new JobSchedulingPlan(VertexParallelism.empty(), Collections.emptyList()); + } + + /** Assignment of a slot to some target (e.g. a slot sharing group). */ + public static class SlotAssignment { + private final SlotInfo slotInfo; + /** + * Interpreted by {@link + * org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator#tryReserveResources(JobSchedulingPlan)}. + * This can be a slot sharing group, a task, or something else. + */ + private final Object target; + + public SlotAssignment(SlotInfo slotInfo, Object target) { + this.slotInfo = slotInfo; + this.target = target; + } + + public SlotInfo getSlotInfo() { + return slotInfo; + } + + public Object getTarget() { + return target; + } + + public <T> T getTargetAs(Class<T> clazz) { + return (T) getTarget(); + } + + @Override + public String toString() { + return String.format( + "SlotAssignment: %s, target: %s", slotInfo.getAllocationId(), target); + } + } + + @Override + public String toString() { + return String.format( + "JobSchedulingPlan: parallelism: %s, assignments: %s", + vertexParallelism, slotAssignments); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java index 9112b52e876..fea0d659ad3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; import org.apache.flink.runtime.util.ResourceCounter; import java.util.Collection; @@ -43,7 +44,7 @@ public interface SlotAllocator { * given job information. * * <p>Implementations of this method must be side-effect free. There is no guarantee that the - * result of this method is ever passed to {@link #tryReserveResources(VertexParallelism)}. + * result of this method is ever passed to {@link #tryReserveResources(JobSchedulingPlan)}. * * @param jobInformation information about the job graph * @param slots slots to consider for determining the parallelism @@ -51,7 +52,14 @@ public interface SlotAllocator { * how the vertices could be assigned to slots, if all vertices could be run with the given * slots */ - Optional<? extends VertexParallelism> determineParallelism( + Optional<VertexParallelism> determineParallelism( + JobInformation jobInformation, Collection<? extends SlotInfo> slots); + + /** + * Same as {@link #determineParallelism(JobInformation, Collection)} but additionally determine + * assignment of slots to execution slot sharing groups. + */ + Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment( JobInformation jobInformation, Collection<? extends SlotInfo> slots); /** @@ -59,9 +67,9 @@ public interface SlotAllocator { * resources has changed and the reservation with respect to vertexParallelism is no longer * possible, then this method returns {@link Optional#empty()}. * - * @param vertexParallelism information on how slots should be assigned to the slots + * @param jobSchedulingPlan information on how slots should be assigned to the slots * @return Set of reserved slots if the reservation was successful; otherwise {@link * Optional#empty()} */ - Optional<ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism); + Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan jobSchedulingPlan); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index ce4c6bfb1af..2b7a9c362ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -26,9 +26,10 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -93,7 +94,7 @@ public class SlotSharingSlotAllocator implements SlotAllocator { } @Override - public Optional<VertexParallelismWithSlotSharing> determineParallelism( + public Optional<VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots) { // => less slots than slot-sharing groups @@ -104,9 +105,6 @@ public class SlotSharingSlotAllocator implements SlotAllocator { final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism = determineSlotsPerSharingGroup(jobInformation, freeSlots.size()); - final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator(); - - final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new ArrayList<>(); final Map<JobVertexID, Integer> allVertexParallelism = new HashMap<>(); for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { @@ -120,21 +118,37 @@ public class SlotSharingSlotAllocator implements SlotAllocator { containedJobVertices, slotSharingGroupParallelism.get( slotSharingGroup.getSlotSharingGroupId())); + allVertexParallelism.putAll(vertexParallelism); + } + return Optional.of(new VertexParallelism(allVertexParallelism)); + } - final Iterable<ExecutionSlotSharingGroup> sharedSlotToVertexAssignment = - createExecutionSlotSharingGroups(vertexParallelism); - - for (ExecutionSlotSharingGroup executionSlotSharingGroup : - sharedSlotToVertexAssignment) { - final SlotInfo slotInfo = slotIterator.next(); + @Override + public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment( + JobInformation jobInformation, Collection<? extends SlotInfo> slots) { + return determineParallelism(jobInformation, slots) + .map( + parallelism -> + new JobSchedulingPlan( + parallelism, + assignSlots(jobInformation, slots, parallelism))); + } - assignments.add( - new ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo)); - } - allVertexParallelism.putAll(vertexParallelism); + private Collection<SlotAssignment> assignSlots( + JobInformation jobInformation, + Collection<? extends SlotInfo> freeSlots, + VertexParallelism vertexParallelism) { + List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>(); + for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); } - return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments)); + Iterator<? extends SlotInfo> iterator = freeSlots.iterator(); + Collection<SlotAssignment> assignments = new ArrayList<>(); + for (ExecutionSlotSharingGroup group : allGroups) { + assignments.add(new SlotAssignment(iterator.next(), group)); + } + return assignments; } /** @@ -186,15 +200,16 @@ public class SlotSharingSlotAllocator implements SlotAllocator { return vertexParallelism; } - private static Iterable<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups( - Map<JobVertexID, Integer> containedJobVertices) { + private static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups( + VertexParallelism vertexParallelism, SlotSharingGroup slotSharingGroup) { final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>(); - for (Map.Entry<JobVertexID, Integer> jobVertex : containedJobVertices.entrySet()) { - for (int i = 0; i < jobVertex.getValue(); i++) { + for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) { + int parallelism = vertexParallelism.getParallelism(jobVertexId); + for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) { sharedSlotToVertexAssignment - .computeIfAbsent(i, ignored -> new HashSet<>()) - .add(new ExecutionVertexID(jobVertex.getKey(), i)); + .computeIfAbsent(subtaskIdx, ignored -> new HashSet<>()) + .add(new ExecutionVertexID(jobVertexId, subtaskIdx)); } } @@ -204,34 +219,20 @@ public class SlotSharingSlotAllocator implements SlotAllocator { } @Override - public Optional<ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism) { - Preconditions.checkArgument( - vertexParallelism instanceof VertexParallelismWithSlotSharing, - String.format( - "%s expects %s as argument.", - SlotSharingSlotAllocator.class.getSimpleName(), - VertexParallelismWithSlotSharing.class.getSimpleName())); - - final VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing = - (VertexParallelismWithSlotSharing) vertexParallelism; - + public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan jobSchedulingPlan) { final Collection<AllocationID> expectedSlots = - calculateExpectedSlots(vertexParallelismWithSlotSharing.getAssignments()); + calculateExpectedSlots(jobSchedulingPlan.getSlotAssignments()); if (areAllExpectedSlotsAvailableAndFree(expectedSlots)) { final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new HashMap<>(); - for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup : - vertexParallelismWithSlotSharing.getAssignments()) { - final SharedSlot sharedSlot = - reserveSharedSlot(executionSlotSharingGroup.getSlotInfo()); - + for (SlotAssignment assignment : jobSchedulingPlan.getSlotAssignments()) { + final SharedSlot sharedSlot = reserveSharedSlot(assignment.getSlotInfo()); for (ExecutionVertexID executionVertexId : - executionSlotSharingGroup - .getExecutionSlotSharingGroup() + assignment + .getTargetAs(ExecutionSlotSharingGroup.class) .getContainedExecutionVertices()) { - final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot(); - assignedSlots.put(executionVertexId, logicalSlot); + assignedSlots.put(executionVertexId, sharedSlot.allocateLogicalSlot()); } } @@ -242,11 +243,10 @@ public class SlotSharingSlotAllocator implements SlotAllocator { } @Nonnull - private Collection<AllocationID> calculateExpectedSlots( - Iterable<? extends ExecutionSlotSharingGroupAndSlot> assignments) { + private Collection<AllocationID> calculateExpectedSlots(Iterable<SlotAssignment> assignments) { final Collection<AllocationID> requiredSlots = new ArrayList<>(); - for (ExecutionSlotSharingGroupAndSlot assignment : assignments) { + for (SlotAssignment assignment : assignments) { requiredSlots.add(assignment.getSlotInfo().getAllocationId()); } return requiredSlots; @@ -288,23 +288,4 @@ public class SlotSharingSlotAllocator implements SlotAllocator { return containedExecutionVertices; } } - - static class ExecutionSlotSharingGroupAndSlot { - private final ExecutionSlotSharingGroup executionSlotSharingGroup; - private final SlotInfo slotInfo; - - public ExecutionSlotSharingGroupAndSlot( - ExecutionSlotSharingGroup executionSlotSharingGroup, SlotInfo slotInfo) { - this.executionSlotSharingGroup = executionSlotSharingGroup; - this.slotInfo = slotInfo; - } - - public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() { - return executionSlotSharingGroup; - } - - public SlotInfo getSlotInfo() { - return slotInfo; - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java index 0a19a3217ed..dcc3c10fd18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java @@ -20,17 +20,44 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.Collection; +import java.util.Collections; import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; /** - * Core result of {@link SlotAllocator#determineParallelism(JobInformation, Collection)}, describing - * the parallelism each vertex could be scheduled with. - * - * <p>{@link SlotAllocator} implementations may encode additional information to be used in {@link - * SlotAllocator#tryReserveResources(VertexParallelism)}. + * Core result of {@link SlotAllocator#determineParallelism(JobInformation, Collection)} among with + * {@link org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment + * slotAssignments}, describing the parallelism each vertex could be scheduled with. */ -public interface VertexParallelism { - Map<JobVertexID, Integer> getMaxParallelismForVertices(); +public class VertexParallelism { + private final Map<JobVertexID, Integer> parallelismForVertices; + + public VertexParallelism(Map<JobVertexID, Integer> parallelismForVertices) { + this.parallelismForVertices = parallelismForVertices; + } + + public int getParallelism(JobVertexID jobVertexId) { + checkArgument( + parallelismForVertices.containsKey(jobVertexId), "Unknown vertex: " + jobVertexId); + return parallelismForVertices.get(jobVertexId); + } + + public Optional<Integer> getParallelismOptional(JobVertexID jobVertexId) { + return Optional.ofNullable(parallelismForVertices.get(jobVertexId)); + } + + public int getCumulativeParallelism() { + return parallelismForVertices.values().stream().reduce(0, Integer::sum); + } + + @Override + public String toString() { + return "VertexParallelism: " + parallelismForVertices; + } - int getParallelism(JobVertexID jobVertexId); + public static VertexParallelism empty() { + return new VertexParallelism(Collections.emptyMap()); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java deleted file mode 100644 index 05bcadf2728..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelismWithSlotSharing.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler.adaptive.allocator; - -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.util.Preconditions; - -import java.util.Collection; -import java.util.Map; - -/** {@link VertexParallelism} implementation for the {@link SlotSharingSlotAllocator}. */ -public class VertexParallelismWithSlotSharing implements VertexParallelism { - - private final Map<JobVertexID, Integer> vertexParallelism; - private final Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> assignments; - - VertexParallelismWithSlotSharing( - Map<JobVertexID, Integer> vertexParallelism, - Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> assignments) { - this.vertexParallelism = vertexParallelism; - this.assignments = Preconditions.checkNotNull(assignments); - } - - Iterable<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> getAssignments() { - return assignments; - } - - @Override - public Map<JobVertexID, Integer> getMaxParallelismForVertices() { - return vertexParallelism; - } - - @Override - public int getParallelism(JobVertexID jobVertexId) { - return vertexParallelism.get(jobVertexId); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index e6d457a8ad7..978750ecf3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -1408,8 +1408,7 @@ public class AdaptiveSchedulerTest extends TestLogger { final CreatingExecutionGraph.AssignmentResult assignmentResult = adaptiveScheduler.tryToAssignSlots( CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( - new StateTrackingMockExecutionGraph(), - new CreatingExecutionGraphTest.TestingVertexParallelism())); + new StateTrackingMockExecutionGraph(), JobSchedulingPlan.empty())); assertThat(assignmentResult.isSuccess()).isFalse(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index 85a4fe4f3ab..54ebdc11702 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -25,13 +25,11 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.GlobalFailureHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; -import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -45,7 +43,6 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -158,8 +155,7 @@ public class CreatingExecutionGraphTest extends TestLogger { context.setExpectWaitingForResources(); executionGraphWithVertexParallelismFuture.complete( - CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( - new StateTrackingMockExecutionGraph(), new TestingVertexParallelism())); + getGraph(new StateTrackingMockExecutionGraph())); } } @@ -183,9 +179,7 @@ public class CreatingExecutionGraphTest extends TestLogger { actualExecutionGraph -> assertThat(actualExecutionGraph).isEqualTo(executionGraph)); - executionGraphWithVertexParallelismFuture.complete( - CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( - executionGraph, new TestingVertexParallelism())); + executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); } } @@ -214,9 +208,7 @@ public class CreatingExecutionGraphTest extends TestLogger { actualExecutionGraph -> assertThat(actualExecutionGraph).isEqualTo(executionGraph)); - executionGraphWithVertexParallelismFuture.complete( - CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( - executionGraph, new TestingVertexParallelism())); + executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); assertThat(operatorCoordinatorGlobalFailureHandlerRef.get()).isSameAs(context); } @@ -353,16 +345,9 @@ public class CreatingExecutionGraphTest extends TestLogger { } } - static final class TestingVertexParallelism implements VertexParallelism { - - @Override - public Map<JobVertexID, Integer> getMaxParallelismForVertices() { - throw new UnsupportedOperationException("Is not supported"); - } - - @Override - public int getParallelism(JobVertexID jobVertexId) { - throw new UnsupportedOperationException("Is not supported"); - } + private static CreatingExecutionGraph.ExecutionGraphWithVertexParallelism getGraph( + StateTrackingMockExecutionGraph executionGraph) { + return CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( + executionGraph, JobSchedulingPlan.empty()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java index 8be500919f1..7643acc7410 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.TestLogger; @@ -96,15 +98,12 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final JobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)); - final VertexParallelism slotSharingAssignments = + final VertexParallelism vertexParallelism = slotAllocator.determineParallelism(jobInformation, getSlots(2)).get(); - final Map<JobVertexID, Integer> maxParallelismForVertices = - slotSharingAssignments.getMaxParallelismForVertices(); - - assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), is(1)); - assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), is(1)); - assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), is(1)); + assertThat(vertexParallelism.getParallelism(vertex1.getJobVertexID()), is(1)); + assertThat(vertexParallelism.getParallelism(vertex2.getJobVertexID()), is(1)); + assertThat(vertexParallelism.getParallelism(vertex3.getJobVertexID()), is(1)); } @Test @@ -118,20 +117,17 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final JobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)); - final VertexParallelism slotSharingAssignments = + final VertexParallelism vertexParallelism = slotAllocator.determineParallelism(jobInformation, getSlots(50)).get(); - final Map<JobVertexID, Integer> maxParallelismForVertices = - slotSharingAssignments.getMaxParallelismForVertices(); - assertThat( - maxParallelismForVertices.get(vertex1.getJobVertexID()), + vertexParallelism.getParallelism(vertex1.getJobVertexID()), is(vertex1.getParallelism())); assertThat( - maxParallelismForVertices.get(vertex2.getJobVertexID()), + vertexParallelism.getParallelism(vertex2.getJobVertexID()), is(vertex2.getParallelism())); assertThat( - maxParallelismForVertices.get(vertex3.getJobVertexID()), + vertexParallelism.getParallelism(vertex3.getJobVertexID()), is(vertex3.getParallelism())); } @@ -153,19 +149,18 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { TestJobInformation testJobInformation = new TestJobInformation(Arrays.asList(vertex11, vertex12, vertex2)); - Map<JobVertexID, Integer> maxParallelismForVertices = + VertexParallelism vertexParallelism = slotAllocator .determineParallelism( testJobInformation, getSlots(vertex11.getParallelism() + vertex2.getParallelism())) - .get() - .getMaxParallelismForVertices(); + .get(); - Assertions.assertThat(maxParallelismForVertices.get(vertex11.getJobVertexID())) + Assertions.assertThat(vertexParallelism.getParallelism(vertex11.getJobVertexID())) .isEqualTo(vertex11.getParallelism()); - Assertions.assertThat(maxParallelismForVertices.get(vertex12.getJobVertexID())) + Assertions.assertThat(vertexParallelism.getParallelism(vertex12.getJobVertexID())) .isEqualTo(vertex12.getParallelism()); - Assertions.assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID())) + Assertions.assertThat(vertexParallelism.getParallelism(vertex2.getJobVertexID())) .isEqualTo(vertex2.getParallelism()); } @@ -180,10 +175,10 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final JobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)); - final Optional<VertexParallelismWithSlotSharing> slotSharingAssignments = + final Optional<VertexParallelism> vertexParallelism = slotAllocator.determineParallelism(jobInformation, getSlots(1)); - assertThat(slotSharingAssignments.isPresent(), is(false)); + assertThat(vertexParallelism.isPresent(), is(false)); } @Test @@ -197,20 +192,23 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final JobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)); - final VertexParallelismWithSlotSharing slotAssignments = - slotAllocator.determineParallelism(jobInformation, getSlots(50)).get(); + final JobSchedulingPlan jobSchedulingPlan = + slotAllocator + .determineParallelismAndCalculateAssignment(jobInformation, getSlots(50)) + .get(); final ReservedSlots reservedSlots = slotAllocator - .tryReserveResources(slotAssignments) + .tryReserveResources(jobSchedulingPlan) .orElseThrow( () -> new RuntimeException("Expected that reservation succeeds.")); final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new HashMap<>(); - for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot assignment : - slotAssignments.getAssignments()) { + for (JobSchedulingPlan.SlotAssignment assignment : jobSchedulingPlan.getSlotAssignments()) { + ExecutionSlotSharingGroup target = + assignment.getTargetAs(ExecutionSlotSharingGroup.class); for (ExecutionVertexID containedExecutionVertex : - assignment.getExecutionSlotSharingGroup().getContainedExecutionVertices()) { + target.getContainedExecutionVertices()) { expectedAssignments.put(containedExecutionVertex, assignment.getSlotInfo()); } } @@ -234,11 +232,13 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final JobInformation jobInformation = new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)); - final VertexParallelismWithSlotSharing slotAssignments = - slotSharingSlotAllocator.determineParallelism(jobInformation, getSlots(50)).get(); + JobSchedulingPlan jobSchedulingPlan = + slotSharingSlotAllocator + .determineParallelismAndCalculateAssignment(jobInformation, getSlots(50)) + .get(); final Optional<? extends ReservedSlots> reservedSlots = - slotSharingSlotAllocator.tryReserveResources(slotAssignments); + slotSharingSlotAllocator.tryReserveResources(jobSchedulingPlan); assertFalse(reservedSlots.isPresent()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java index aa6fd7bad8d..44b73122ae9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; import org.apache.flink.runtime.util.ResourceCounter; import java.util.Collection; @@ -33,9 +34,7 @@ public class TestingSlotAllocator implements SlotAllocator { calculateRequiredSlotsFunction; private final BiFunction< - JobInformation, - Collection<? extends SlotInfo>, - Optional<? extends VertexParallelism>> + JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> determineParallelismFunction; private final Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction; @@ -43,10 +42,7 @@ public class TestingSlotAllocator implements SlotAllocator { private TestingSlotAllocator( Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction, - BiFunction< - JobInformation, - Collection<? extends SlotInfo>, - Optional<? extends VertexParallelism>> + BiFunction<JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> determineParallelismFunction, Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction) { this.calculateRequiredSlotsFunction = calculateRequiredSlotsFunction; @@ -61,14 +57,20 @@ public class TestingSlotAllocator implements SlotAllocator { } @Override - public Optional<? extends VertexParallelism> determineParallelism( + public Optional<VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> slots) { return determineParallelismFunction.apply(jobInformation, slots); } @Override - public Optional<ReservedSlots> tryReserveResources(VertexParallelism vertexParallelism) { - return tryReserveResourcesFunction.apply(vertexParallelism); + public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment( + JobInformation jobInformation, Collection<? extends SlotInfo> slots) { + return Optional.empty(); + } + + @Override + public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan jobSchedulingPlan) { + return tryReserveResourcesFunction.apply(jobSchedulingPlan.getVertexParallelism()); } public static Builder newBuilder() { @@ -80,9 +82,7 @@ public class TestingSlotAllocator implements SlotAllocator { private Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction = ignored -> ResourceCounter.empty(); private BiFunction< - JobInformation, - Collection<? extends SlotInfo>, - Optional<? extends VertexParallelism>> + JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> determineParallelismFunction = (ignoredA, ignoredB) -> Optional.empty(); private Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction = ignored -> Optional.empty(); @@ -98,7 +98,7 @@ public class TestingSlotAllocator implements SlotAllocator { BiFunction< JobInformation, Collection<? extends SlotInfo>, - Optional<? extends VertexParallelism>> + Optional<VertexParallelism>> determineParallelismFunction) { this.determineParallelismFunction = determineParallelismFunction; return this;