This is an automated email from the ASF dual-hosted git repository. weizhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 08050f8c26381fef9a9a746643e593443df977a6 Author: Roc Marshal <[email protected]> AuthorDate: Mon Dec 2 23:47:53 2024 +0800 [FLINK-33389][refactor] Extract the assignment logic of task to slot as SlotSharingResolver then refactor the implementation of the default resover. --- .../adaptive/allocator/AllocatorUtil.java | 27 ------------ .../adaptive/allocator/DefaultSlotAssigner.java | 16 ++++--- ...orUtil.java => DefaultSlotSharingResolver.java} | 51 +++++++++------------- .../adaptive/allocator/SlotSharingResolver.java | 39 +++++++++++++++++ .../allocator/SlotSharingSlotAllocator.java | 31 ++++++++++--- .../allocator/StateLocalitySlotAssigner.java | 21 +++++---- .../allocator/DefaultSlotAssignerTest.java | 5 ++- .../allocator/SlotSharingSlotAllocatorTest.java | 4 +- .../allocator/StateLocalitySlotAssignerTest.java | 2 +- 9 files changed, 111 insertions(+), 85 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java index d516aae2596..24c81ab0fb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java @@ -19,17 +19,10 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.SlotInfo; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -61,24 +54,4 @@ class AllocatorUtil { freeSlots.size(), minimumRequiredSlots); } - - static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> - createExecutionSlotSharingGroups( - VertexParallelism vertexParallelism, SlotSharingGroup slotSharingGroup) { - final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>(); - slotSharingGroup - .getJobVertexIds() - .forEach( - jobVertexId -> { - int parallelism = vertexParallelism.getParallelism(jobVertexId); - for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) { - sharedSlotToVertexAssignment - .computeIfAbsent(subtaskIdx, ignored -> new HashSet<>()) - .add(new ExecutionVertexID(jobVertexId, subtaskIdx)); - } - }); - return sharedSlotToVertexAssignment.values().stream() - .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new) - .collect(Collectors.toList()); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java index 2dc0f6dac56..4adcd4a090b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; @@ -38,7 +37,6 @@ import java.util.stream.Collectors; import static java.util.function.Function.identity; import static org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots; -import static org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups; /** * Simple {@link SlotAssigner} that treats all slots and slot sharing groups equally. Specifically, @@ -53,10 +51,15 @@ public class DefaultSlotAssigner implements SlotAssigner { private final @Nullable String executionTarget; private final boolean minimalTaskManagerPreferred; + private final SlotSharingResolver slotSharingResolver; - DefaultSlotAssigner(@Nullable String executionTarget, boolean minimalTaskManagerPreferred) { + DefaultSlotAssigner( + @Nullable String executionTarget, + boolean minimalTaskManagerPreferred, + SlotSharingResolver slotSharingResolver) { this.executionTarget = executionTarget; this.minimalTaskManagerPreferred = minimalTaskManagerPreferred; + this.slotSharingResolver = slotSharingResolver; } @Override @@ -67,10 +70,9 @@ public class DefaultSlotAssigner implements SlotAssigner { JobAllocationsInformation previousAllocations) { checkMinimumRequiredSlots(jobInformation, freeSlots); - final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>(); - for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { - allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); - } + final Collection<ExecutionSlotSharingGroup> allGroups = + slotSharingResolver.getExecutionSlotSharingGroups( + jobInformation, vertexParallelism); final Collection<? extends SlotInfo> pickedSlots = pickSlotsIfNeeded(allGroups.size(), freeSlots); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotSharingResolver.java similarity index 60% copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotSharingResolver.java index d516aae2596..d927b973719 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocatorUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotSharingResolver.java @@ -18,11 +18,12 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; -import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -31,35 +32,20 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.util.Preconditions.checkState; +/** The default implementation of the {@link SlotSharingResolver}. */ +@Internal +public enum DefaultSlotSharingResolver implements SlotSharingResolver { + INSTANCE; -/** The allocator util class. */ -class AllocatorUtil { - - private AllocatorUtil() {} - - static Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo> - getSlotSharingGroupMetaInfos(JobInformation jobInformation) { - return SlotSharingSlotAllocator.SlotSharingGroupMetaInfo.from(jobInformation.getVertices()); - } - - static int getMinimumRequiredSlots( - Map<SlotSharingGroupId, SlotSharingSlotAllocator.SlotSharingGroupMetaInfo> - slotSharingGroupMetaInfos) { - return slotSharingGroupMetaInfos.values().stream() - .map(SlotSharingSlotAllocator.SlotSharingGroupMetaInfo::getMaxLowerBound) - .reduce(0, Integer::sum); - } - - static void checkMinimumRequiredSlots( - JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots) { - final int minimumRequiredSlots = - getMinimumRequiredSlots(getSlotSharingGroupMetaInfos(jobInformation)); - checkState( - freeSlots.size() >= minimumRequiredSlots, - "Not enough slots to allocate all the execution slot sharing groups (have: %s, need: %s)", - freeSlots.size(), - minimumRequiredSlots); + @Override + public Collection<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups( + JobInformation jobInformation, VertexParallelism vertexParallelism) { + final List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> allGroups = + new ArrayList<>(); + for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); + } + return allGroups; } static List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> @@ -78,7 +64,10 @@ class AllocatorUtil { } }); return sharedSlotToVertexAssignment.values().stream() - .map(SlotSharingSlotAllocator.ExecutionSlotSharingGroup::new) + .map( + executionVertexIDs -> + new SlotSharingSlotAllocator.ExecutionSlotSharingGroup( + slotSharingGroup, executionVertexIDs)) .collect(Collectors.toList()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingResolver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingResolver.java new file mode 100644 index 00000000000..dc0eb36e350 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingResolver.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; + +import java.util.Collection; + +/** + * The resolver to define how to construct execution slot sharing groups from the execution vertices + * of a job with the vertices parallelisms. + */ +public interface SlotSharingResolver { + /** + * Get the execution slot sharing groups for the job information based on the vertices + * parallelisms. + * + * @param jobInformation the job information. + * @param vertexParallelism the parallelisms for the vertices of the job information. + * @return the all execution slot sharing groups for the deployment of the job. + */ + Collection<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups( + JobInformation jobInformation, VertexParallelism vertexParallelism); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index 425159cfe0f..58712498224 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.SlotSharingGroupId; @@ -30,6 +31,7 @@ import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -59,6 +61,7 @@ public class SlotSharingSlotAllocator implements SlotAllocator { private final boolean localRecoveryEnabled; private final @Nullable String executionTarget; private final boolean minimalTaskManagerPreferred; + private final SlotSharingResolver slotSharingResolver = DefaultSlotSharingResolver.INSTANCE; private SlotSharingSlotAllocator( ReserveSlotFunction reserveSlot, @@ -150,9 +153,11 @@ public class SlotSharingSlotAllocator implements SlotAllocator { parallelism -> { SlotAssigner slotAssigner = localRecoveryEnabled && !jobAllocationsInformation.isEmpty() - ? new StateLocalitySlotAssigner() + ? new StateLocalitySlotAssigner(slotSharingResolver) : new DefaultSlotAssigner( - executionTarget, minimalTaskManagerPreferred); + executionTarget, + minimalTaskManagerPreferred, + slotSharingResolver); return new JobSchedulingPlan( parallelism, slotAssigner.assignSlots( @@ -295,18 +300,30 @@ public class SlotSharingSlotAllocator implements SlotAllocator { slotInfo.getAllocationId(), null, System.currentTimeMillis())); } - static class ExecutionSlotSharingGroup { + /** The execution slot sharing group for adaptive scheduler. */ + public static class ExecutionSlotSharingGroup { private final String id; + private final SlotSharingGroup slotSharingGroup; private final Set<ExecutionVertexID> containedExecutionVertices; - public ExecutionSlotSharingGroup(Set<ExecutionVertexID> containedExecutionVertices) { - this(containedExecutionVertices, UUID.randomUUID().toString()); + public ExecutionSlotSharingGroup( + SlotSharingGroup slotSharingGroup, + Set<ExecutionVertexID> containedExecutionVertices) { + this(UUID.randomUUID().toString(), slotSharingGroup, containedExecutionVertices); } public ExecutionSlotSharingGroup( - Set<ExecutionVertexID> containedExecutionVertices, String id) { - this.containedExecutionVertices = containedExecutionVertices; + String id, + SlotSharingGroup slotSharingGroup, + Set<ExecutionVertexID> containedExecutionVertices) { this.id = id; + this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup); + this.containedExecutionVertices = containedExecutionVertices; + } + + @VisibleForTesting + public SlotSharingGroup getSlotSharingGroup() { + return slotSharingGroup; } public String getId() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java index b9524627508..14b3ef7e555 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation; @@ -36,7 +35,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.stream.Collectors; @@ -44,7 +42,6 @@ import java.util.stream.Collectors; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toMap; import static org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.checkMinimumRequiredSlots; -import static org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil.createExecutionSlotSharingGroups; import static org.apache.flink.util.Preconditions.checkState; /** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ @@ -90,6 +87,12 @@ public class StateLocalitySlotAssigner implements SlotAssigner { } } + private final SlotSharingResolver slotSharingResolver; + + StateLocalitySlotAssigner(SlotSharingResolver slotSharingResolver) { + this.slotSharingResolver = slotSharingResolver; + } + @Override public Collection<SlotAssignment> assignSlots( JobInformation jobInformation, @@ -98,10 +101,10 @@ public class StateLocalitySlotAssigner implements SlotAssigner { JobAllocationsInformation previousAllocations) { checkMinimumRequiredSlots(jobInformation, freeSlots); - final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>(); - for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { - allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); - } + final Collection<ExecutionSlotSharingGroup> allGroups = + slotSharingResolver.getExecutionSlotSharingGroups( + jobInformation, vertexParallelism); + final Map<JobVertexID, Integer> parallelism = getParallelism(allGroups); final PriorityQueue<AllocationScore> scores = calculateScores(jobInformation, previousAllocations, allGroups, parallelism); @@ -140,7 +143,7 @@ public class StateLocalitySlotAssigner implements SlotAssigner { private PriorityQueue<AllocationScore> calculateScores( JobInformation jobInformation, JobAllocationsInformation previousAllocations, - List<ExecutionSlotSharingGroup> allGroups, + Collection<ExecutionSlotSharingGroup> allGroups, Map<JobVertexID, Integer> parallelism) { // PQ orders the pairs (allocationID, groupID) by score, decreasing // the score is computed as the potential amount of state that would reside locally @@ -153,7 +156,7 @@ public class StateLocalitySlotAssigner implements SlotAssigner { } private static Map<JobVertexID, Integer> getParallelism( - List<ExecutionSlotSharingGroup> groups) { + Collection<ExecutionSlotSharingGroup> groups) { final Map<JobVertexID, Integer> parallelism = new HashMap<>(); for (ExecutionSlotSharingGroup group : groups) { for (ExecutionVertexID evi : group.getContainedExecutionVertices()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java index 60a1067efb0..86010f1a70f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssignerTest.java @@ -77,7 +77,10 @@ class DefaultSlotAssignerTest { @TestTemplate void testPickSlotsIfNeeded() { final DefaultSlotAssigner slotAssigner = - new DefaultSlotAssigner(APPLICATION_MODE_EXECUTION_TARGET, true); + new DefaultSlotAssigner( + APPLICATION_MODE_EXECUTION_TARGET, + true, + DefaultSlotSharingResolver.INSTANCE); final Set<TaskManagerLocation> keptTaskExecutors = slotAssigner.pickSlotsIfNeeded(parallelism, freeSlots).stream() .map(SlotInfo::getTaskManagerLocation) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java index dc5800b1719..e7ac761804e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java @@ -484,8 +484,8 @@ class SlotSharingSlotAllocatorTest { set.add(id.getJobVertexId()); } } - assertThat(allocated.get(allocation1)).contains(vertex1.getJobVertexID()); - assertThat(allocated.get(allocation1)).contains(vertex2.getJobVertexID()); + assertThat(allocated.get(allocation1)) + .contains(vertex1.getJobVertexID(), vertex2.getJobVertexID()); assertThat(allocated.get(allocation2)).contains(vertex3.getJobVertexID()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java index 9be8dc41ec7..bca7c549bdb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java @@ -225,7 +225,7 @@ class StateLocalitySlotAssignerTest { VertexInformation vertexInformation, List<AllocationID> allocationIDs, List<VertexAllocationInformation> allocations) { - return new StateLocalitySlotAssigner() + return new StateLocalitySlotAssigner(DefaultSlotSharingResolver.INSTANCE) .assignSlots( new TestJobInformation(singletonList(vertexInformation)), allocationIDs.stream().map(TestingSlot::new).collect(Collectors.toList()),
