This is an automated email from the ASF dual-hosted git repository. weizhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 252f6521aa8f74dbdb96f1e789c37dcb2da59138 Author: Roc Marshal <[email protected]> AuthorDate: Mon Dec 2 22:38:31 2024 +0800 [FLINK-33389][refactor] Make TaskBalancedExecutionSlotSharingGroupBuilder as a separated class. --- .../scheduler/AbstractSlotSharingStrategy.java | 19 +- .../LocalInputPreferredSlotSharingStrategy.java | 3 +- ...skBalancedExecutionSlotSharingGroupBuilder.java | 263 +++++++++++++++++++++ .../TaskBalancedPreferredSlotSharingStrategy.java | 247 +------------------ 4 files changed, 282 insertions(+), 250 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java index d13c62909b3..27485fcd777 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java @@ -34,6 +34,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -93,17 +94,23 @@ abstract class AbstractSlotSharingStrategy /** * The vertices are topologically sorted since {@link DefaultExecutionTopology#getVertices} are * topologically sorted. + * + * @param topology The job topology. + * @param executionVertexIdentifierMapper The vertex identifier mapper function to convert a + * execution vertex to a target identifier. + * @return The vertices identifiers in topologically order. + * @param <T> The type of the execution vertex identifier. */ @Nonnull - static LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> getExecutionVertices( - SchedulingTopology topology) { - final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> vertices = - new LinkedHashMap<>(); + static <T> LinkedHashMap<JobVertexID, List<T>> getExecutionVertices( + SchedulingTopology topology, + Function<SchedulingExecutionVertex, T> executionVertexIdentifierMapper) { + final LinkedHashMap<JobVertexID, List<T>> vertices = new LinkedHashMap<>(); for (SchedulingExecutionVertex executionVertex : topology.getVertices()) { - final List<SchedulingExecutionVertex> executionVertexGroup = + final List<T> executionVertexGroup = vertices.computeIfAbsent( executionVertex.getId().getJobVertexId(), k -> new ArrayList<>()); - executionVertexGroup.add(executionVertex); + executionVertexGroup.add(executionVertexIdentifierMapper.apply(executionVertex)); } return vertices; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java index f4c0e854512..8a57e6b5cdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -168,7 +169,7 @@ class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy */ private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() { final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices = - getExecutionVertices(topology); + getExecutionVertices(topology, Function.identity()); // loop on job vertices so that an execution vertex will not be added into a group // if that group better fits another execution vertex diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedExecutionSlotSharingGroupBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedExecutionSlotSharingGroupBuilder.java new file mode 100644 index 00000000000..a6ea8df5181 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedExecutionSlotSharingGroupBuilder.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Execute slot sharing groups builder based on the balanced scheduling strategy. */ +public class TaskBalancedExecutionSlotSharingGroupBuilder { + + public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedExecutionSlotSharingGroupBuilder.class); + + private final Map<JobVertexID, List<ExecutionVertexID>> allVertices; + + private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap; + + /** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ + private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> + paralleledExecutionSlotSharingGroupsMap; + + /** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ + private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap; + + private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap; + + private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap; + + private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> + constraintToExecutionSlotSharingGroupMap; + + public TaskBalancedExecutionSlotSharingGroupBuilder( + final Map<JobVertexID, List<ExecutionVertexID>> allVertices, + final Collection<SlotSharingGroup> slotSharingGroups, + final Collection<CoLocationGroup> coLocationGroups) { + this.allVertices = checkNotNull(allVertices); + + this.coLocationGroupMap = new HashMap<>(); + for (CoLocationGroup coLocationGroup : coLocationGroups) { + for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) { + coLocationGroupMap.put(jobVertexId, coLocationGroup); + } + } + + this.constraintToExecutionSlotSharingGroupMap = new HashMap<>(); + this.paralleledExecutionSlotSharingGroupsMap = new HashMap<>(slotSharingGroups.size()); + this.slotSharingGroupIndexMap = new HashMap<>(slotSharingGroups.size()); + this.slotSharingGroupMap = new HashMap<>(); + this.executionSlotSharingGroupMap = new HashMap<>(); + + for (SlotSharingGroup slotSharingGroup : slotSharingGroups) { + for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) { + slotSharingGroupMap.put(jobVertexId, slotSharingGroup); + } + } + } + + public Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() { + + initParalleledExecutionSlotSharingGroupsMap(allVertices); + + // Loop on job vertices + for (Map.Entry<JobVertexID, List<ExecutionVertexID>> executionVertexInfos : + allVertices.entrySet()) { + + JobVertexID jobVertexID = executionVertexInfos.getKey(); + List<ExecutionVertexID> executionVertexIds = executionVertexInfos.getValue(); + final SlotSharingGroup slotSharingGroup = slotSharingGroupMap.get(jobVertexID); + + if (!coLocationGroupMap.containsKey(jobVertexID)) { + // For vertices without CoLocationConstraint. + allocateNonCoLocatedVertices(slotSharingGroup, executionVertexIds); + } else { + // For vertices with CoLocationConstraint. + allocateCoLocatedVertices(slotSharingGroup, executionVertexIds); + } + } + return executionSlotSharingGroupMap; + } + + private void initParalleledExecutionSlotSharingGroupsMap( + final Map<JobVertexID, List<ExecutionVertexID>> allVertices) { + + allVertices.entrySet().stream() + .map( + jobVertexExecutionVertices -> + Tuple2.of( + slotSharingGroupMap.get( + jobVertexExecutionVertices.getKey()), + jobVertexExecutionVertices.getValue().size())) + .collect( + Collectors.groupingBy( + tuple -> tuple.f0, Collectors.summarizingInt(tuple -> tuple.f1))) + .forEach( + (slotSharingGroup, statistics) -> { + int slotNum = statistics.getMax(); + paralleledExecutionSlotSharingGroupsMap.put( + slotSharingGroup, + createExecutionSlotSharingGroups(slotSharingGroup, slotNum)); + }); + } + + private List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups( + SlotSharingGroup slotSharingGroup, int slotNum) { + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = new ArrayList<>(slotNum); + for (int i = 0; i < slotNum; i++) { + final ExecutionSlotSharingGroup executionSlotSharingGroup = + new ExecutionSlotSharingGroup(slotSharingGroup); + executionSlotSharingGroups.add(i, executionSlotSharingGroup); + LOG.debug( + "Create {}-th(st/nd) executionSlotSharingGroup {}.", + i, + executionSlotSharingGroup); + } + return executionSlotSharingGroups; + } + + private void allocateCoLocatedVertices( + SlotSharingGroup slotSharingGroup, List<ExecutionVertexID> executionVertexIds) { + + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = + paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); + for (ExecutionVertexID executionVertexId : executionVertexIds) { + final CoLocationConstraint coLocationConstraint = + getCoLocationConstraint(executionVertexId); + ExecutionSlotSharingGroup executionSlotSharingGroup = + constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint); + if (Objects.isNull(executionSlotSharingGroup)) { + executionSlotSharingGroup = + executionSlotSharingGroups.get( + getLeastUtilizeSlotIndex( + executionSlotSharingGroups, executionVertexId)); + constraintToExecutionSlotSharingGroupMap.put( + coLocationConstraint, executionSlotSharingGroup); + } + addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, executionVertexId); + } + final int jobVertexParallel = executionVertexIds.size(); + if (!isMaxParallelism(jobVertexParallel, slotSharingGroup)) { + int index = getLeastUtilizeSlotIndex(executionSlotSharingGroups, null); + updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, slotSharingGroup, index); + } + } + + private void allocateNonCoLocatedVertices( + SlotSharingGroup slotSharingGroup, List<ExecutionVertexID> executionVertices) { + final int jobVertexParallel = executionVertices.size(); + int index = getSlotRoundRobinIndex(jobVertexParallel, slotSharingGroup); + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = + paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); + for (ExecutionVertexID executionVertexId : executionVertices) { + addVertexToExecutionSlotSharingGroup( + executionSlotSharingGroups.get(index), executionVertexId); + index = ++index % executionSlotSharingGroups.size(); + } + updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), slotSharingGroup, index); + } + + private void addVertexToExecutionSlotSharingGroup( + ExecutionSlotSharingGroup executionSlotSharingGroup, + ExecutionVertexID executionVertexId) { + executionSlotSharingGroup.addVertex(executionVertexId); + executionSlotSharingGroupMap.put(executionVertexId, executionSlotSharingGroup); + } + + private CoLocationConstraint getCoLocationConstraint(ExecutionVertexID executionVertexId) { + final JobVertexID jobVertexID = executionVertexId.getJobVertexId(); + final int subtaskIndex = executionVertexId.getSubtaskIndex(); + return coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex); + } + + private int getSlotRoundRobinIndex( + final int jobVertexParallelism, SlotSharingGroup slotSharingGroup) { + final boolean maxParallel = isMaxParallelism(jobVertexParallelism, slotSharingGroup); + return maxParallel ? 0 : slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0); + } + + private void updateSlotRoundRobinIndexIfNeeded( + final int jobVertexParallelism, + final SlotSharingGroup slotSharingGroup, + final int nextIndex) { + if (!isMaxParallelism(jobVertexParallelism, slotSharingGroup)) { + slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex); + } + } + + private boolean isMaxParallelism( + final int jobVertexParallelism, final SlotSharingGroup slotSharingGroup) { + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = + paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); + return jobVertexParallelism == executionSlotSharingGroups.size(); + } + + private int getLeastUtilizeSlotIndex( + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups, + @Nullable final ExecutionVertexID executionVertexId) { + int indexWithLeastExecutionVertices = 0; + int leastExecutionVertices = Integer.MAX_VALUE; + for (int index = 0; index < executionSlotSharingGroups.size(); index++) { + final ExecutionSlotSharingGroup executionSlotSharingGroup = + executionSlotSharingGroups.get(index); + final int executionVertices = executionSlotSharingGroup.getExecutionVertexIds().size(); + if (leastExecutionVertices > executionVertices + && (Objects.isNull(executionVertexId) + || allocatable(executionSlotSharingGroup, executionVertexId))) { + indexWithLeastExecutionVertices = index; + leastExecutionVertices = executionVertices; + } + } + return indexWithLeastExecutionVertices; + } + + private boolean allocatable( + final ExecutionSlotSharingGroup executionSlotSharingGroup, + @Nonnull ExecutionVertexID executionVertexId) { + final JobVertexID jobVertexId = executionVertexId.getJobVertexId(); + final Set<JobVertexID> allocatedJobVertices = + executionSlotSharingGroup.getExecutionVertexIds().stream() + .map(ExecutionVertexID::getJobVertexId) + .collect(Collectors.toSet()); + return !allocatedJobVertices.contains(jobVertexId); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java index 93f6e43ba81..571ac9d3f57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java @@ -18,31 +18,17 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.runtime.topology.Vertex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to @@ -65,7 +51,9 @@ class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrate protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> computeExecutionSlotSharingGroups( SchedulingTopology schedulingTopology) { return new TaskBalancedExecutionSlotSharingGroupBuilder( - schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) + getExecutionVertices(schedulingTopology, Vertex::getId), + this.logicalSlotSharingGroups, + this.coLocationGroups) .build(); } @@ -80,231 +68,4 @@ class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrate topology, slotSharingGroups, coLocationGroups); } } - - /** SlotSharingGroupBuilder class for balanced scheduling strategy. */ - private static class TaskBalancedExecutionSlotSharingGroupBuilder { - - private final SchedulingTopology topology; - - private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap; - - /** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ - private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> - paralleledExecutionSlotSharingGroupsMap; - - /** - * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link - * SlotSharingGroup}s. - */ - private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap; - - private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> - executionSlotSharingGroupMap; - - private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap; - - private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> - constraintToExecutionSlotSharingGroupMap; - - private TaskBalancedExecutionSlotSharingGroupBuilder( - final SchedulingTopology topology, - final Set<SlotSharingGroup> slotSharingGroups, - final Set<CoLocationGroup> coLocationGroups) { - this.topology = checkNotNull(topology); - - this.coLocationGroupMap = new HashMap<>(); - for (CoLocationGroup coLocationGroup : coLocationGroups) { - for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) { - coLocationGroupMap.put(jobVertexId, coLocationGroup); - } - } - - this.constraintToExecutionSlotSharingGroupMap = new HashMap<>(); - this.paralleledExecutionSlotSharingGroupsMap = new HashMap<>(slotSharingGroups.size()); - this.slotSharingGroupIndexMap = new HashMap<>(slotSharingGroups.size()); - this.slotSharingGroupMap = new HashMap<>(); - this.executionSlotSharingGroupMap = new HashMap<>(); - - for (SlotSharingGroup slotSharingGroup : slotSharingGroups) { - for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) { - slotSharingGroupMap.put(jobVertexId, slotSharingGroup); - } - } - } - - private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() { - - final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices = - getExecutionVertices(topology); - - initParalleledExecutionSlotSharingGroupsMap(allVertices); - - // Loop on job vertices - for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> executionVertexInfos : - allVertices.entrySet()) { - - JobVertexID jobVertexID = executionVertexInfos.getKey(); - List<SchedulingExecutionVertex> executionVertices = executionVertexInfos.getValue(); - final SlotSharingGroup slotSharingGroup = slotSharingGroupMap.get(jobVertexID); - - if (!coLocationGroupMap.containsKey(jobVertexID)) { - // For vertices without CoLocationConstraint. - allocateNonCoLocatedVertices(slotSharingGroup, executionVertices); - } else { - // For vertices with CoLocationConstraint. - allocateCoLocatedVertices(slotSharingGroup, executionVertices); - } - } - return executionSlotSharingGroupMap; - } - - private void initParalleledExecutionSlotSharingGroupsMap( - final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices) { - - allVertices.entrySet().stream() - .map( - jobVertexExecutionVertices -> - Tuple2.of( - slotSharingGroupMap.get( - jobVertexExecutionVertices.getKey()), - jobVertexExecutionVertices.getValue().size())) - .collect( - Collectors.groupingBy( - tuple -> tuple.f0, - Collectors.summarizingInt(tuple -> tuple.f1))) - .forEach( - (slotSharingGroup, statistics) -> { - int slotNum = statistics.getMax(); - paralleledExecutionSlotSharingGroupsMap.put( - slotSharingGroup, - createExecutionSlotSharingGroups( - slotSharingGroup, slotNum)); - }); - } - - private List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups( - SlotSharingGroup slotSharingGroup, int slotNum) { - final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = - new ArrayList<>(slotNum); - for (int i = 0; i < slotNum; i++) { - final ExecutionSlotSharingGroup executionSlotSharingGroup = - new ExecutionSlotSharingGroup(slotSharingGroup); - executionSlotSharingGroups.add(i, executionSlotSharingGroup); - LOG.debug( - "Create {}th executionSlotSharingGroup {}.", i, executionSlotSharingGroup); - } - return executionSlotSharingGroups; - } - - private void allocateCoLocatedVertices( - SlotSharingGroup slotSharingGroup, - List<SchedulingExecutionVertex> executionVertices) { - - final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = - paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); - for (SchedulingExecutionVertex executionVertex : executionVertices) { - final CoLocationConstraint coLocationConstraint = - getCoLocationConstraint(executionVertex); - ExecutionSlotSharingGroup executionSlotSharingGroup = - constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint); - if (Objects.isNull(executionSlotSharingGroup)) { - executionSlotSharingGroup = - executionSlotSharingGroups.get( - getLeastUtilizeSlotIndex( - executionSlotSharingGroups, executionVertex)); - constraintToExecutionSlotSharingGroupMap.put( - coLocationConstraint, executionSlotSharingGroup); - } - addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, executionVertex); - } - final int jobVertexParallel = executionVertices.size(); - if (!isMaxParallelism(jobVertexParallel, slotSharingGroup)) { - int index = getLeastUtilizeSlotIndex(executionSlotSharingGroups, null); - updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, slotSharingGroup, index); - } - } - - private void allocateNonCoLocatedVertices( - SlotSharingGroup slotSharingGroup, - List<SchedulingExecutionVertex> executionVertices) { - final int jobVertexParallel = executionVertices.size(); - int index = getSlotRoundRobinIndex(jobVertexParallel, slotSharingGroup); - final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = - paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); - for (SchedulingExecutionVertex executionVertex : executionVertices) { - addVertexToExecutionSlotSharingGroup( - executionSlotSharingGroups.get(index), executionVertex); - index = ++index % executionSlotSharingGroups.size(); - } - updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), slotSharingGroup, index); - } - - private void addVertexToExecutionSlotSharingGroup( - ExecutionSlotSharingGroup executionSlotSharingGroup, - SchedulingExecutionVertex executionVertex) { - final ExecutionVertexID executionVertexId = executionVertex.getId(); - executionSlotSharingGroup.addVertex(executionVertexId); - executionSlotSharingGroupMap.put(executionVertexId, executionSlotSharingGroup); - } - - private CoLocationConstraint getCoLocationConstraint(SchedulingExecutionVertex sev) { - final JobVertexID jobVertexID = sev.getId().getJobVertexId(); - final int subtaskIndex = sev.getId().getSubtaskIndex(); - return coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex); - } - - private int getSlotRoundRobinIndex( - final int jobVertexParallelism, SlotSharingGroup slotSharingGroup) { - final boolean maxParallel = isMaxParallelism(jobVertexParallelism, slotSharingGroup); - return maxParallel ? 0 : slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0); - } - - private void updateSlotRoundRobinIndexIfNeeded( - final int jobVertexParallelism, - final SlotSharingGroup slotSharingGroup, - final int nextIndex) { - if (!isMaxParallelism(jobVertexParallelism, slotSharingGroup)) { - slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex); - } - } - - private boolean isMaxParallelism( - final int jobVertexParallelism, final SlotSharingGroup slotSharingGroup) { - final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = - paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); - return jobVertexParallelism == executionSlotSharingGroups.size(); - } - - private int getLeastUtilizeSlotIndex( - final List<ExecutionSlotSharingGroup> executionSlotSharingGroups, - @Nullable final SchedulingExecutionVertex executionVertex) { - int indexWithLeastExecutionVertices = 0; - int leastExecutionVertices = Integer.MAX_VALUE; - for (int index = 0; index < executionSlotSharingGroups.size(); index++) { - final ExecutionSlotSharingGroup executionSlotSharingGroup = - executionSlotSharingGroups.get(index); - final int executionVertices = - executionSlotSharingGroup.getExecutionVertexIds().size(); - if (leastExecutionVertices > executionVertices - && (Objects.isNull(executionVertex) - || allocatable(executionSlotSharingGroup, executionVertex))) { - indexWithLeastExecutionVertices = index; - leastExecutionVertices = executionVertices; - } - } - return indexWithLeastExecutionVertices; - } - - private boolean allocatable( - final ExecutionSlotSharingGroup executionSlotSharingGroup, - @Nonnull SchedulingExecutionVertex executionVertex) { - final ExecutionVertexID executionVertexId = executionVertex.getId(); - final JobVertexID jobVertexId = executionVertexId.getJobVertexId(); - final Set<JobVertexID> allocatedJobVertices = - executionSlotSharingGroup.getExecutionVertexIds().stream() - .map(ExecutionVertexID::getJobVertexId) - .collect(Collectors.toSet()); - return !allocatedJobVertices.contains(jobVertexId); - } - } }
