This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 252f6521aa8f74dbdb96f1e789c37dcb2da59138
Author: Roc Marshal <[email protected]>
AuthorDate: Mon Dec 2 22:38:31 2024 +0800

    [FLINK-33389][refactor] Make TaskBalancedExecutionSlotSharingGroupBuilder 
as a separated  class.
---
 .../scheduler/AbstractSlotSharingStrategy.java     |  19 +-
 .../LocalInputPreferredSlotSharingStrategy.java    |   3 +-
 ...skBalancedExecutionSlotSharingGroupBuilder.java | 263 +++++++++++++++++++++
 .../TaskBalancedPreferredSlotSharingStrategy.java  | 247 +------------------
 4 files changed, 282 insertions(+), 250 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java
index d13c62909b3..27485fcd777 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java
@@ -34,6 +34,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -93,17 +94,23 @@ abstract class AbstractSlotSharingStrategy
     /**
      * The vertices are topologically sorted since {@link 
DefaultExecutionTopology#getVertices} are
      * topologically sorted.
+     *
+     * @param topology The job topology.
+     * @param executionVertexIdentifierMapper The vertex identifier mapper 
function to convert a
+     *     execution vertex to a target identifier.
+     * @return The vertices identifiers in topologically order.
+     * @param <T> The type of the execution vertex identifier.
      */
     @Nonnull
-    static LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
getExecutionVertices(
-            SchedulingTopology topology) {
-        final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
vertices =
-                new LinkedHashMap<>();
+    static <T> LinkedHashMap<JobVertexID, List<T>> getExecutionVertices(
+            SchedulingTopology topology,
+            Function<SchedulingExecutionVertex, T> 
executionVertexIdentifierMapper) {
+        final LinkedHashMap<JobVertexID, List<T>> vertices = new 
LinkedHashMap<>();
         for (SchedulingExecutionVertex executionVertex : 
topology.getVertices()) {
-            final List<SchedulingExecutionVertex> executionVertexGroup =
+            final List<T> executionVertexGroup =
                     vertices.computeIfAbsent(
                             executionVertex.getId().getJobVertexId(), k -> new 
ArrayList<>());
-            executionVertexGroup.add(executionVertex);
+            
executionVertexGroup.add(executionVertexIdentifierMapper.apply(executionVertex));
         }
         return vertices;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
index f4c0e854512..8a57e6b5cdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -168,7 +169,7 @@ class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
          */
         private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
             final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
-                    getExecutionVertices(topology);
+                    getExecutionVertices(topology, Function.identity());
 
             // loop on job vertices so that an execution vertex will not be 
added into a group
             // if that group better fits another execution vertex
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedExecutionSlotSharingGroupBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedExecutionSlotSharingGroupBuilder.java
new file mode 100644
index 00000000000..a6ea8df5181
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedExecutionSlotSharingGroupBuilder.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Execute slot sharing groups builder based on the balanced scheduling 
strategy. */
+public class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedExecutionSlotSharingGroupBuilder.class);
+
+    private final Map<JobVertexID, List<ExecutionVertexID>> allVertices;
+
+    private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+    /** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
+    private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>>
+            paralleledExecutionSlotSharingGroupsMap;
+
+    /**
+     * Record the next round-robin {@link ExecutionSlotSharingGroup} index for 
{@link
+     * SlotSharingGroup}s.
+     */
+    private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap;
+
+    private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
executionSlotSharingGroupMap;
+
+    private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+    private final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
+            constraintToExecutionSlotSharingGroupMap;
+
+    public TaskBalancedExecutionSlotSharingGroupBuilder(
+            final Map<JobVertexID, List<ExecutionVertexID>> allVertices,
+            final Collection<SlotSharingGroup> slotSharingGroups,
+            final Collection<CoLocationGroup> coLocationGroups) {
+        this.allVertices = checkNotNull(allVertices);
+
+        this.coLocationGroupMap = new HashMap<>();
+        for (CoLocationGroup coLocationGroup : coLocationGroups) {
+            for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) {
+                coLocationGroupMap.put(jobVertexId, coLocationGroup);
+            }
+        }
+
+        this.constraintToExecutionSlotSharingGroupMap = new HashMap<>();
+        this.paralleledExecutionSlotSharingGroupsMap = new 
HashMap<>(slotSharingGroups.size());
+        this.slotSharingGroupIndexMap = new 
HashMap<>(slotSharingGroups.size());
+        this.slotSharingGroupMap = new HashMap<>();
+        this.executionSlotSharingGroupMap = new HashMap<>();
+
+        for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
+            for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) 
{
+                slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+            }
+        }
+    }
+
+    public Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+
+        initParalleledExecutionSlotSharingGroupsMap(allVertices);
+
+        // Loop on job vertices
+        for (Map.Entry<JobVertexID, List<ExecutionVertexID>> 
executionVertexInfos :
+                allVertices.entrySet()) {
+
+            JobVertexID jobVertexID = executionVertexInfos.getKey();
+            List<ExecutionVertexID> executionVertexIds = 
executionVertexInfos.getValue();
+            final SlotSharingGroup slotSharingGroup = 
slotSharingGroupMap.get(jobVertexID);
+
+            if (!coLocationGroupMap.containsKey(jobVertexID)) {
+                // For vertices without CoLocationConstraint.
+                allocateNonCoLocatedVertices(slotSharingGroup, 
executionVertexIds);
+            } else {
+                // For vertices with CoLocationConstraint.
+                allocateCoLocatedVertices(slotSharingGroup, 
executionVertexIds);
+            }
+        }
+        return executionSlotSharingGroupMap;
+    }
+
+    private void initParalleledExecutionSlotSharingGroupsMap(
+            final Map<JobVertexID, List<ExecutionVertexID>> allVertices) {
+
+        allVertices.entrySet().stream()
+                .map(
+                        jobVertexExecutionVertices ->
+                                Tuple2.of(
+                                        slotSharingGroupMap.get(
+                                                
jobVertexExecutionVertices.getKey()),
+                                        
jobVertexExecutionVertices.getValue().size()))
+                .collect(
+                        Collectors.groupingBy(
+                                tuple -> tuple.f0, 
Collectors.summarizingInt(tuple -> tuple.f1)))
+                .forEach(
+                        (slotSharingGroup, statistics) -> {
+                            int slotNum = statistics.getMax();
+                            paralleledExecutionSlotSharingGroupsMap.put(
+                                    slotSharingGroup,
+                                    
createExecutionSlotSharingGroups(slotSharingGroup, slotNum));
+                        });
+    }
+
+    private List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+            SlotSharingGroup slotSharingGroup, int slotNum) {
+        final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = new 
ArrayList<>(slotNum);
+        for (int i = 0; i < slotNum; i++) {
+            final ExecutionSlotSharingGroup executionSlotSharingGroup =
+                    new ExecutionSlotSharingGroup(slotSharingGroup);
+            executionSlotSharingGroups.add(i, executionSlotSharingGroup);
+            LOG.debug(
+                    "Create {}-th(st/nd) executionSlotSharingGroup {}.",
+                    i,
+                    executionSlotSharingGroup);
+        }
+        return executionSlotSharingGroups;
+    }
+
+    private void allocateCoLocatedVertices(
+            SlotSharingGroup slotSharingGroup, List<ExecutionVertexID> 
executionVertexIds) {
+
+        final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
+                paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
+        for (ExecutionVertexID executionVertexId : executionVertexIds) {
+            final CoLocationConstraint coLocationConstraint =
+                    getCoLocationConstraint(executionVertexId);
+            ExecutionSlotSharingGroup executionSlotSharingGroup =
+                    
constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint);
+            if (Objects.isNull(executionSlotSharingGroup)) {
+                executionSlotSharingGroup =
+                        executionSlotSharingGroups.get(
+                                getLeastUtilizeSlotIndex(
+                                        executionSlotSharingGroups, 
executionVertexId));
+                constraintToExecutionSlotSharingGroupMap.put(
+                        coLocationConstraint, executionSlotSharingGroup);
+            }
+            addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, 
executionVertexId);
+        }
+        final int jobVertexParallel = executionVertexIds.size();
+        if (!isMaxParallelism(jobVertexParallel, slotSharingGroup)) {
+            int index = getLeastUtilizeSlotIndex(executionSlotSharingGroups, 
null);
+            updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, 
slotSharingGroup, index);
+        }
+    }
+
+    private void allocateNonCoLocatedVertices(
+            SlotSharingGroup slotSharingGroup, List<ExecutionVertexID> 
executionVertices) {
+        final int jobVertexParallel = executionVertices.size();
+        int index = getSlotRoundRobinIndex(jobVertexParallel, 
slotSharingGroup);
+        final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
+                paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
+        for (ExecutionVertexID executionVertexId : executionVertices) {
+            addVertexToExecutionSlotSharingGroup(
+                    executionSlotSharingGroups.get(index), executionVertexId);
+            index = ++index % executionSlotSharingGroups.size();
+        }
+        updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), 
slotSharingGroup, index);
+    }
+
+    private void addVertexToExecutionSlotSharingGroup(
+            ExecutionSlotSharingGroup executionSlotSharingGroup,
+            ExecutionVertexID executionVertexId) {
+        executionSlotSharingGroup.addVertex(executionVertexId);
+        executionSlotSharingGroupMap.put(executionVertexId, 
executionSlotSharingGroup);
+    }
+
+    private CoLocationConstraint getCoLocationConstraint(ExecutionVertexID 
executionVertexId) {
+        final JobVertexID jobVertexID = executionVertexId.getJobVertexId();
+        final int subtaskIndex = executionVertexId.getSubtaskIndex();
+        return 
coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex);
+    }
+
+    private int getSlotRoundRobinIndex(
+            final int jobVertexParallelism, SlotSharingGroup slotSharingGroup) 
{
+        final boolean maxParallel = isMaxParallelism(jobVertexParallelism, 
slotSharingGroup);
+        return maxParallel ? 0 : 
slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0);
+    }
+
+    private void updateSlotRoundRobinIndexIfNeeded(
+            final int jobVertexParallelism,
+            final SlotSharingGroup slotSharingGroup,
+            final int nextIndex) {
+        if (!isMaxParallelism(jobVertexParallelism, slotSharingGroup)) {
+            slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex);
+        }
+    }
+
+    private boolean isMaxParallelism(
+            final int jobVertexParallelism, final SlotSharingGroup 
slotSharingGroup) {
+        final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
+                paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
+        return jobVertexParallelism == executionSlotSharingGroups.size();
+    }
+
+    private int getLeastUtilizeSlotIndex(
+            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups,
+            @Nullable final ExecutionVertexID executionVertexId) {
+        int indexWithLeastExecutionVertices = 0;
+        int leastExecutionVertices = Integer.MAX_VALUE;
+        for (int index = 0; index < executionSlotSharingGroups.size(); 
index++) {
+            final ExecutionSlotSharingGroup executionSlotSharingGroup =
+                    executionSlotSharingGroups.get(index);
+            final int executionVertices = 
executionSlotSharingGroup.getExecutionVertexIds().size();
+            if (leastExecutionVertices > executionVertices
+                    && (Objects.isNull(executionVertexId)
+                            || allocatable(executionSlotSharingGroup, 
executionVertexId))) {
+                indexWithLeastExecutionVertices = index;
+                leastExecutionVertices = executionVertices;
+            }
+        }
+        return indexWithLeastExecutionVertices;
+    }
+
+    private boolean allocatable(
+            final ExecutionSlotSharingGroup executionSlotSharingGroup,
+            @Nonnull ExecutionVertexID executionVertexId) {
+        final JobVertexID jobVertexId = executionVertexId.getJobVertexId();
+        final Set<JobVertexID> allocatedJobVertices =
+                executionSlotSharingGroup.getExecutionVertexIds().stream()
+                        .map(ExecutionVertexID::getJobVertexId)
+                        .collect(Collectors.toSet());
+        return !allocatedJobVertices.contains(jobVertexId);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java
index 93f6e43ba81..571ac9d3f57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java
@@ -18,31 +18,17 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.topology.Vertex;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
@@ -65,7 +51,9 @@ class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrate
     protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionSlotSharingGroups(
             SchedulingTopology schedulingTopology) {
         return new TaskBalancedExecutionSlotSharingGroupBuilder(
-                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                        getExecutionVertices(schedulingTopology, 
Vertex::getId),
+                        this.logicalSlotSharingGroups,
+                        this.coLocationGroups)
                 .build();
     }
 
@@ -80,231 +68,4 @@ class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrate
                     topology, slotSharingGroups, coLocationGroups);
         }
     }
-
-    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
-    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
-
-        private final SchedulingTopology topology;
-
-        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
-
-        /** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
-        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>>
-                paralleledExecutionSlotSharingGroupsMap;
-
-        /**
-         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
-         * SlotSharingGroup}s.
-         */
-        private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap;
-
-        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
-                executionSlotSharingGroupMap;
-
-        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
-
-        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
-                constraintToExecutionSlotSharingGroupMap;
-
-        private TaskBalancedExecutionSlotSharingGroupBuilder(
-                final SchedulingTopology topology,
-                final Set<SlotSharingGroup> slotSharingGroups,
-                final Set<CoLocationGroup> coLocationGroups) {
-            this.topology = checkNotNull(topology);
-
-            this.coLocationGroupMap = new HashMap<>();
-            for (CoLocationGroup coLocationGroup : coLocationGroups) {
-                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
-                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
-                }
-            }
-
-            this.constraintToExecutionSlotSharingGroupMap = new HashMap<>();
-            this.paralleledExecutionSlotSharingGroupsMap = new 
HashMap<>(slotSharingGroups.size());
-            this.slotSharingGroupIndexMap = new 
HashMap<>(slotSharingGroups.size());
-            this.slotSharingGroupMap = new HashMap<>();
-            this.executionSlotSharingGroupMap = new HashMap<>();
-
-            for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
-                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
-                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
-                }
-            }
-        }
-
-        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
-
-            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
-                    getExecutionVertices(topology);
-
-            initParalleledExecutionSlotSharingGroupsMap(allVertices);
-
-            // Loop on job vertices
-            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> 
executionVertexInfos :
-                    allVertices.entrySet()) {
-
-                JobVertexID jobVertexID = executionVertexInfos.getKey();
-                List<SchedulingExecutionVertex> executionVertices = 
executionVertexInfos.getValue();
-                final SlotSharingGroup slotSharingGroup = 
slotSharingGroupMap.get(jobVertexID);
-
-                if (!coLocationGroupMap.containsKey(jobVertexID)) {
-                    // For vertices without CoLocationConstraint.
-                    allocateNonCoLocatedVertices(slotSharingGroup, 
executionVertices);
-                } else {
-                    // For vertices with CoLocationConstraint.
-                    allocateCoLocatedVertices(slotSharingGroup, 
executionVertices);
-                }
-            }
-            return executionSlotSharingGroupMap;
-        }
-
-        private void initParalleledExecutionSlotSharingGroupsMap(
-                final LinkedHashMap<JobVertexID, 
List<SchedulingExecutionVertex>> allVertices) {
-
-            allVertices.entrySet().stream()
-                    .map(
-                            jobVertexExecutionVertices ->
-                                    Tuple2.of(
-                                            slotSharingGroupMap.get(
-                                                    
jobVertexExecutionVertices.getKey()),
-                                            
jobVertexExecutionVertices.getValue().size()))
-                    .collect(
-                            Collectors.groupingBy(
-                                    tuple -> tuple.f0,
-                                    Collectors.summarizingInt(tuple -> 
tuple.f1)))
-                    .forEach(
-                            (slotSharingGroup, statistics) -> {
-                                int slotNum = statistics.getMax();
-                                paralleledExecutionSlotSharingGroupsMap.put(
-                                        slotSharingGroup,
-                                        createExecutionSlotSharingGroups(
-                                                slotSharingGroup, slotNum));
-                            });
-        }
-
-        private List<ExecutionSlotSharingGroup> 
createExecutionSlotSharingGroups(
-                SlotSharingGroup slotSharingGroup, int slotNum) {
-            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
-                    new ArrayList<>(slotNum);
-            for (int i = 0; i < slotNum; i++) {
-                final ExecutionSlotSharingGroup executionSlotSharingGroup =
-                        new ExecutionSlotSharingGroup(slotSharingGroup);
-                executionSlotSharingGroups.add(i, executionSlotSharingGroup);
-                LOG.debug(
-                        "Create {}th executionSlotSharingGroup {}.", i, 
executionSlotSharingGroup);
-            }
-            return executionSlotSharingGroups;
-        }
-
-        private void allocateCoLocatedVertices(
-                SlotSharingGroup slotSharingGroup,
-                List<SchedulingExecutionVertex> executionVertices) {
-
-            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
-                    
paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
-            for (SchedulingExecutionVertex executionVertex : 
executionVertices) {
-                final CoLocationConstraint coLocationConstraint =
-                        getCoLocationConstraint(executionVertex);
-                ExecutionSlotSharingGroup executionSlotSharingGroup =
-                        
constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint);
-                if (Objects.isNull(executionSlotSharingGroup)) {
-                    executionSlotSharingGroup =
-                            executionSlotSharingGroups.get(
-                                    getLeastUtilizeSlotIndex(
-                                            executionSlotSharingGroups, 
executionVertex));
-                    constraintToExecutionSlotSharingGroupMap.put(
-                            coLocationConstraint, executionSlotSharingGroup);
-                }
-                
addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, 
executionVertex);
-            }
-            final int jobVertexParallel = executionVertices.size();
-            if (!isMaxParallelism(jobVertexParallel, slotSharingGroup)) {
-                int index = 
getLeastUtilizeSlotIndex(executionSlotSharingGroups, null);
-                updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, 
slotSharingGroup, index);
-            }
-        }
-
-        private void allocateNonCoLocatedVertices(
-                SlotSharingGroup slotSharingGroup,
-                List<SchedulingExecutionVertex> executionVertices) {
-            final int jobVertexParallel = executionVertices.size();
-            int index = getSlotRoundRobinIndex(jobVertexParallel, 
slotSharingGroup);
-            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
-                    
paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
-            for (SchedulingExecutionVertex executionVertex : 
executionVertices) {
-                addVertexToExecutionSlotSharingGroup(
-                        executionSlotSharingGroups.get(index), 
executionVertex);
-                index = ++index % executionSlotSharingGroups.size();
-            }
-            updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), 
slotSharingGroup, index);
-        }
-
-        private void addVertexToExecutionSlotSharingGroup(
-                ExecutionSlotSharingGroup executionSlotSharingGroup,
-                SchedulingExecutionVertex executionVertex) {
-            final ExecutionVertexID executionVertexId = 
executionVertex.getId();
-            executionSlotSharingGroup.addVertex(executionVertexId);
-            executionSlotSharingGroupMap.put(executionVertexId, 
executionSlotSharingGroup);
-        }
-
-        private CoLocationConstraint 
getCoLocationConstraint(SchedulingExecutionVertex sev) {
-            final JobVertexID jobVertexID = sev.getId().getJobVertexId();
-            final int subtaskIndex = sev.getId().getSubtaskIndex();
-            return 
coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex);
-        }
-
-        private int getSlotRoundRobinIndex(
-                final int jobVertexParallelism, SlotSharingGroup 
slotSharingGroup) {
-            final boolean maxParallel = isMaxParallelism(jobVertexParallelism, 
slotSharingGroup);
-            return maxParallel ? 0 : 
slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0);
-        }
-
-        private void updateSlotRoundRobinIndexIfNeeded(
-                final int jobVertexParallelism,
-                final SlotSharingGroup slotSharingGroup,
-                final int nextIndex) {
-            if (!isMaxParallelism(jobVertexParallelism, slotSharingGroup)) {
-                slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex);
-            }
-        }
-
-        private boolean isMaxParallelism(
-                final int jobVertexParallelism, final SlotSharingGroup 
slotSharingGroup) {
-            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
-                    
paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
-            return jobVertexParallelism == executionSlotSharingGroups.size();
-        }
-
-        private int getLeastUtilizeSlotIndex(
-                final List<ExecutionSlotSharingGroup> 
executionSlotSharingGroups,
-                @Nullable final SchedulingExecutionVertex executionVertex) {
-            int indexWithLeastExecutionVertices = 0;
-            int leastExecutionVertices = Integer.MAX_VALUE;
-            for (int index = 0; index < executionSlotSharingGroups.size(); 
index++) {
-                final ExecutionSlotSharingGroup executionSlotSharingGroup =
-                        executionSlotSharingGroups.get(index);
-                final int executionVertices =
-                        
executionSlotSharingGroup.getExecutionVertexIds().size();
-                if (leastExecutionVertices > executionVertices
-                        && (Objects.isNull(executionVertex)
-                                || allocatable(executionSlotSharingGroup, 
executionVertex))) {
-                    indexWithLeastExecutionVertices = index;
-                    leastExecutionVertices = executionVertices;
-                }
-            }
-            return indexWithLeastExecutionVertices;
-        }
-
-        private boolean allocatable(
-                final ExecutionSlotSharingGroup executionSlotSharingGroup,
-                @Nonnull SchedulingExecutionVertex executionVertex) {
-            final ExecutionVertexID executionVertexId = 
executionVertex.getId();
-            final JobVertexID jobVertexId = executionVertexId.getJobVertexId();
-            final Set<JobVertexID> allocatedJobVertices =
-                    executionSlotSharingGroup.getExecutionVertexIds().stream()
-                            .map(ExecutionVertexID::getJobVertexId)
-                            .collect(Collectors.toSet());
-            return !allocatedJobVertices.contains(jobVertexId);
-        }
-    }
 }

Reply via email to