1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390238967


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##########
@@ -295,6 +294,19 @@ public static SlotManagerConfiguration fromConfiguration(
                 redundantTaskManagerNum);
     }
 
+    @VisibleForTesting
+    public static SlotMatchingStrategy 
getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) {

Review Comment:
   ```suggestion
      static SlotMatchingStrategy 
getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) {
   ```
   
   Is the default enough here? I see all caller are the same package.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());
+                List<ExecutionSlotSharingGroup> executionSSGs = 
ssgToExecutionSSGs.get(ssg);
+                int index = getSlotRoundRobinIndex(entry.getValue().size(), 
ssg);
+
+                for (SchedulingExecutionVertex sev : entry.getValue()) {
+                    // For no CoLocations or with CoLocations with max 
parallelism of the SSG.
+                    if (!coLocationGroupMap.containsKey(entry.getKey())
+                            || ssgToExecutionSSGs.get(ssg).size() == 
entry.getValue().size()) {
+                        index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                        continue;
+                    }
+
+                    // For CoLocations with exited execution slot sharing 
group.
+                    final CoLocationConstraint clc = 
getCoLocationConstraint(entry.getKey(), sev);
+                    if (clcToESsgMap.get(clc) != null) {
+                        clcToESsgMap.get(clc).addVertex(sev.getId());
+                        executionSlotSharingGroupMap.put(sev.getId(), 
clcToESsgMap.get(clc));
+                        continue;
+                    }
+                    // For CoLocations without existed execution slot sharing 
group.
+                    index = findAvailableESSGForCoLocation(sev, executionSSGs);
+                    clcToESsgMap.put(clc, executionSSGs.get(index));
+                    index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                }
+                updateSlotRoundRobinIndexIfNeeded(entry.getValue().size(), 
ssg, index);
+            }
+            return executionSlotSharingGroupMap;
+        }
+
+        private void computeSsgToESsgMap(
+                LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices) {
+
+            allVertices.entrySet().stream()
+                    .map(
+                            entry ->
+                                    new AbstractMap.SimpleEntry<>(
+                                            entry.getKey(), 
entry.getValue().size()))
+                    .collect(
+                            Collectors.groupingBy(entry -> 
slotSharingGroupMap.get(entry.getKey())))
+                    .forEach(
+                            (ssg, entries) -> {
+                                Integer slotNumOfSsg =
+                                        entries.stream()
+                                                .map(Map.Entry::getValue)
+                                                .max(Comparator.comparingInt(o 
-> o))
+                                                .orElseThrow(
+                                                        () ->
+                                                                new 
FlinkRuntimeException(
+                                                                        
String.format(
+                                                                               
 "Error state when computing the number of slots in %s",
+                                                                               
 ssg)));
+                                List<ExecutionSlotSharingGroup> eSsgs =
+                                        new ArrayList<>(slotNumOfSsg);
+                                for (int i = 0; i < slotNumOfSsg; i++) {
+                                    ExecutionSlotSharingGroup 
executionSlotSharingGroup =
+                                            new ExecutionSlotSharingGroup(ssg);
+                                    eSsgs.add(i, executionSlotSharingGroup);
+                                    LOG.debug(
+                                            "Set resourceProfile {} for {}th 
executionSlotSharingGroup {} in slotSharingGroup {}.",
+                                            ssg.getResourceProfile(),
+                                            i,
+                                            executionSlotSharingGroup,
+                                            ssg);
+                                }
+                                ssgToExecutionSSGs.put(ssg, eSsgs);
+                            });
+        }
+
+        /** Find the execution slot sharing group that holds the least 
execution tasks. */
+        private int findAvailableESSGForCoLocation(
+                SchedulingExecutionVertex sev, List<ExecutionSlotSharingGroup> 
executionSSGs) {
+            int indexWithLeastExecutionVertices = 0;
+            int leastExecutionVertices = Integer.MAX_VALUE;
+            for (int index = 0; index < executionSSGs.size(); index++) {
+                if 
(executionSSGs.get(index).getExecutionVertexIds().contains(sev.getId())) {
+                    continue;
+                }
+                if (leastExecutionVertices
+                        > 
executionSSGs.get(index).getExecutionVertexIds().size()) {
+                    indexWithLeastExecutionVertices = index;
+                    leastExecutionVertices =
+                            
executionSSGs.get(index).getExecutionVertexIds().size();
+                }
+            }
+            return indexWithLeastExecutionVertices;
+        }
+
+        private CoLocationConstraint getCoLocationConstraint(
+                JobVertexID jobVertexID, SchedulingExecutionVertex sev) {
+            
Preconditions.checkState(coLocationGroupMap.containsKey(sev.getId().getJobVertexId()));
+            return coLocationGroupMap
+                    .get(jobVertexID)
+                    .getLocationConstraint(sev.getId().getSubtaskIndex());
+        }
+
+        /**
+         * Add the SchedulingExecutionVertex into a ExecutionSlotSharingGroup 
by the index and
+         * return the next available index.
+         *
+         * @param executionSSGs All execution slot sharing groups for current 
SlotSharingGroup.
+         * @param index The index of the target ExecutionSlotSharingGroup.
+         * @param sev The target ExecutionSlotSharingGroup
+         * @return The next available round-robin slot index.
+         */
+        private int addVertexToExecutionSlotSharingGroup(
+                List<ExecutionSlotSharingGroup> executionSSGs,
+                int index,
+                SchedulingExecutionVertex sev) {
+            executionSSGs.get(index).addVertex(sev.getId());
+            executionSlotSharingGroupMap.put(sev.getId(), 
executionSSGs.get(index));
+            index = ++index % executionSSGs.size();
+            return index;
+        }

Review Comment:
   Can it be simplified to this?
   
   ```
          private void addVertexToExecutionSlotSharingGroup(
                   ExecutionSlotSharingGroup executionSlotSharingGroup,
                   List<ExecutionSlotSharingGroup> executionSSGs,
                   ExecutionVertexID executionVertexID) {
               executionSlotSharingGroup.addVertex(executionVertexID);
               executionSlotSharingGroupMap.put(executionVertexID, 
executionSlotSharingGroup);
           }
   ```
   
   Reasons:
   1. Don't care about index, it's clearer.
   2. ` if (clcToESsgMap.get(clc) != null) {` of 
`TaskBalancedExecutionSlotSharingGroupBuilder#build` can use new 
`addVertexToExecutionSlotSharingGroup` as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());
+                List<ExecutionSlotSharingGroup> executionSSGs = 
ssgToExecutionSSGs.get(ssg);
+                int index = getSlotRoundRobinIndex(entry.getValue().size(), 
ssg);
+
+                for (SchedulingExecutionVertex sev : entry.getValue()) {
+                    // For no CoLocations or with CoLocations with max 
parallelism of the SSG.
+                    if (!coLocationGroupMap.containsKey(entry.getKey())
+                            || ssgToExecutionSSGs.get(ssg).size() == 
entry.getValue().size()) {
+                        index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                        continue;
+                    }
+
+                    // For CoLocations with exited execution slot sharing 
group.
+                    final CoLocationConstraint clc = 
getCoLocationConstraint(entry.getKey(), sev);
+                    if (clcToESsgMap.get(clc) != null) {
+                        clcToESsgMap.get(clc).addVertex(sev.getId());
+                        executionSlotSharingGroupMap.put(sev.getId(), 
clcToESsgMap.get(clc));
+                        continue;
+                    }
+                    // For CoLocations without existed execution slot sharing 
group.
+                    index = findAvailableESSGForCoLocation(sev, executionSSGs);
+                    clcToESsgMap.put(clc, executionSSGs.get(index));
+                    index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                }
+                updateSlotRoundRobinIndexIfNeeded(entry.getValue().size(), 
ssg, index);
+            }
+            return executionSlotSharingGroupMap;
+        }
+
+        private void computeSsgToESsgMap(
+                LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices) {
+
+            allVertices.entrySet().stream()
+                    .map(
+                            entry ->
+                                    new AbstractMap.SimpleEntry<>(
+                                            entry.getKey(), 
entry.getValue().size()))
+                    .collect(
+                            Collectors.groupingBy(entry -> 
slotSharingGroupMap.get(entry.getKey())))
+                    .forEach(
+                            (ssg, entries) -> {
+                                Integer slotNumOfSsg =
+                                        entries.stream()
+                                                .map(Map.Entry::getValue)
+                                                .max(Comparator.comparingInt(o 
-> o))
+                                                .orElseThrow(
+                                                        () ->
+                                                                new 
FlinkRuntimeException(
+                                                                        
String.format(
+                                                                               
 "Error state when computing the number of slots in %s",
+                                                                               
 ssg)));

Review Comment:
   ```suggestion
                                                                           
Tuple2.of(
                                               
slotSharingGroupMap.get(entry.getKey()),
                                               entry.getValue().size()))
                       .collect(
                               Collectors.groupingBy(
                                       tuple -> tuple.f0,
                                       Collectors.summarizingInt(tuple -> 
tuple.f1)))
                       .forEach(
                               (ssg, statistics) -> {
                                   int slotNumOfSsg = statistics.getMax();
   ```
   
   
   How about this update? The agg just needs `ssg` and `parallelism`, so:
   - first step: generate ssg and parallelism.
   - second step: compute max by `summarizingInt`
   - get max directly. In theory, the option shouldn't be empty, so the 
`summarizingInt` is easy to use (It doesn't return Optional)
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());
+                List<ExecutionSlotSharingGroup> executionSSGs = 
ssgToExecutionSSGs.get(ssg);
+                int index = getSlotRoundRobinIndex(entry.getValue().size(), 
ssg);
+
+                for (SchedulingExecutionVertex sev : entry.getValue()) {
+                    // For no CoLocations or with CoLocations with max 
parallelism of the SSG.
+                    if (!coLocationGroupMap.containsKey(entry.getKey())
+                            || ssgToExecutionSSGs.get(ssg).size() == 
entry.getValue().size()) {
+                        index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                        continue;
+                    }
+
+                    // For CoLocations with exited execution slot sharing 
group.
+                    final CoLocationConstraint clc = 
getCoLocationConstraint(entry.getKey(), sev);
+                    if (clcToESsgMap.get(clc) != null) {
+                        clcToESsgMap.get(clc).addVertex(sev.getId());
+                        executionSlotSharingGroupMap.put(sev.getId(), 
clcToESsgMap.get(clc));
+                        continue;
+                    }
+                    // For CoLocations without existed execution slot sharing 
group.
+                    index = findAvailableESSGForCoLocation(sev, executionSSGs);
+                    clcToESsgMap.put(clc, executionSSGs.get(index));
+                    index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                }
+                updateSlotRoundRobinIndexIfNeeded(entry.getValue().size(), 
ssg, index);
+            }
+            return executionSlotSharingGroupMap;
+        }
+
+        private void computeSsgToESsgMap(
+                LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices) {
+
+            allVertices.entrySet().stream()
+                    .map(
+                            entry ->
+                                    new AbstractMap.SimpleEntry<>(
+                                            entry.getKey(), 
entry.getValue().size()))
+                    .collect(
+                            Collectors.groupingBy(entry -> 
slotSharingGroupMap.get(entry.getKey())))
+                    .forEach(
+                            (ssg, entries) -> {
+                                Integer slotNumOfSsg =
+                                        entries.stream()
+                                                .map(Map.Entry::getValue)
+                                                .max(Comparator.comparingInt(o 
-> o))
+                                                .orElseThrow(
+                                                        () ->
+                                                                new 
FlinkRuntimeException(
+                                                                        
String.format(
+                                                                               
 "Error state when computing the number of slots in %s",
+                                                                               
 ssg)));
+                                List<ExecutionSlotSharingGroup> eSsgs =
+                                        new ArrayList<>(slotNumOfSsg);
+                                for (int i = 0; i < slotNumOfSsg; i++) {
+                                    ExecutionSlotSharingGroup 
executionSlotSharingGroup =
+                                            new ExecutionSlotSharingGroup(ssg);
+                                    eSsgs.add(i, executionSlotSharingGroup);
+                                    LOG.debug(
+                                            "Set resourceProfile {} for {}th 
executionSlotSharingGroup {} in slotSharingGroup {}.",
+                                            ssg.getResourceProfile(),
+                                            i,
+                                            executionSlotSharingGroup,
+                                            ssg);

Review Comment:
   ```suggestion
                                       LOG.debug(
                                               "Create {}th 
executionSlotSharingGroup {}.",
                                               i,
                                               executionSlotSharingGroup);
   ```
   
   The resource profile and ssg may be not needed here,  because:
   - `executionSlotSharingGroup.toString` includes `slotSharingGroup`
   - `slotSharingGroup.toString` includes `resourceProfile`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to