KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390593913


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##########
@@ -76,6 +76,6 @@ public ResourceProfile getResourceProfile() {
 
     @Override
     public String toString() {
-        return "SlotSharingGroup " + this.ids.toString();
+        return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + 
resourceProfile + '}';

Review Comment:
   Should belong to a seperate hotfix commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +35,26 @@ class ExecutionSlotSharingGroup {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    @Nonnull private final SlotSharingGroup slotSharingGroup;

Review Comment:
   IIUC, we change it to `SlotSharingGroup` only for test? I don't think that's 
a good practice.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());
+                List<ExecutionSlotSharingGroup> executionSSGs = 
ssgToExecutionSSGs.get(ssg);
+                int index = getSlotRoundRobinIndex(entry.getValue().size(), 
ssg);
+
+                for (SchedulingExecutionVertex sev : entry.getValue()) {
+                    // For no CoLocations or with CoLocations with max 
parallelism of the SSG.
+                    if (!coLocationGroupMap.containsKey(entry.getKey())
+                            || ssgToExecutionSSGs.get(ssg).size() == 
entry.getValue().size()) {
+                        index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                        continue;
+                    }
+
+                    // For CoLocations with exited execution slot sharing 
group.
+                    final CoLocationConstraint clc = 
getCoLocationConstraint(entry.getKey(), sev);
+                    if (clcToESsgMap.get(clc) != null) {
+                        clcToESsgMap.get(clc).addVertex(sev.getId());
+                        executionSlotSharingGroupMap.put(sev.getId(), 
clcToESsgMap.get(clc));
+                        continue;
+                    }
+                    // For CoLocations without existed execution slot sharing 
group.
+                    index = findAvailableESSGForCoLocation(sev, executionSSGs);
+                    clcToESsgMap.put(clc, executionSSGs.get(index));
+                    index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                }
+                updateSlotRoundRobinIndexIfNeeded(entry.getValue().size(), 
ssg, index);
+            }
+            return executionSlotSharingGroupMap;
+        }
+
+        private void computeSsgToESsgMap(
+                LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices) {
+
+            allVertices.entrySet().stream()
+                    .map(
+                            entry ->
+                                    new AbstractMap.SimpleEntry<>(
+                                            entry.getKey(), 
entry.getValue().size()))
+                    .collect(
+                            Collectors.groupingBy(entry -> 
slotSharingGroupMap.get(entry.getKey())))
+                    .forEach(
+                            (ssg, entries) -> {
+                                Integer slotNumOfSsg =
+                                        entries.stream()
+                                                .map(Map.Entry::getValue)
+                                                .max(Comparator.comparingInt(o 
-> o))
+                                                .orElseThrow(
+                                                        () ->
+                                                                new 
FlinkRuntimeException(
+                                                                        
String.format(
+                                                                               
 "Error state when computing the number of slots in %s",
+                                                                               
 ssg)));
+                                List<ExecutionSlotSharingGroup> eSsgs =
+                                        new ArrayList<>(slotNumOfSsg);
+                                for (int i = 0; i < slotNumOfSsg; i++) {
+                                    ExecutionSlotSharingGroup 
executionSlotSharingGroup =
+                                            new ExecutionSlotSharingGroup(ssg);
+                                    eSsgs.add(i, executionSlotSharingGroup);
+                                    LOG.debug(
+                                            "Set resourceProfile {} for {}th 
executionSlotSharingGroup {} in slotSharingGroup {}.",
+                                            ssg.getResourceProfile(),
+                                            i,
+                                            executionSlotSharingGroup,
+                                            ssg);
+                                }
+                                ssgToExecutionSSGs.put(ssg, eSsgs);
+                            });
+        }
+
+        /** Find the execution slot sharing group that holds the least 
execution tasks. */
+        private int findAvailableESSGForCoLocation(
+                SchedulingExecutionVertex sev, List<ExecutionSlotSharingGroup> 
executionSSGs) {
+            int indexWithLeastExecutionVertices = 0;
+            int leastExecutionVertices = Integer.MAX_VALUE;
+            for (int index = 0; index < executionSSGs.size(); index++) {
+                if 
(executionSSGs.get(index).getExecutionVertexIds().contains(sev.getId())) {

Review Comment:
   Does it mean the sev are deployed twice? Should it be a bug?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());
+                List<ExecutionSlotSharingGroup> executionSSGs = 
ssgToExecutionSSGs.get(ssg);
+                int index = getSlotRoundRobinIndex(entry.getValue().size(), 
ssg);
+
+                for (SchedulingExecutionVertex sev : entry.getValue()) {
+                    // For no CoLocations or with CoLocations with max 
parallelism of the SSG.
+                    if (!coLocationGroupMap.containsKey(entry.getKey())
+                            || ssgToExecutionSSGs.get(ssg).size() == 
entry.getValue().size()) {
+                        index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                        continue;
+                    }
+
+                    // For CoLocations with exited execution slot sharing 
group.
+                    final CoLocationConstraint clc = 
getCoLocationConstraint(entry.getKey(), sev);

Review Comment:
   Why there should be a colocation group for this vertex? What about vertices 
whose parallelism less than the size of essg?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());

Review Comment:
   We'd set such "entry.getKey()" to a new variable for better readablility. 
Same as others.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java:
##########
@@ -100,6 +100,7 @@ private static DefaultSchedulerComponents 
createPipelinedRegionSchedulerComponen
                 new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
         final ExecutionSlotAllocatorFactory allocatorFactory =
                 new SlotSharingExecutionSlotAllocatorFactory(
+                        jobMasterConfiguration,

Review Comment:
   We should follow the principle of least exposure. Here we only need the 
TaskManagerLoadBalanceMode instead of the whole configuration.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;

Review Comment:
   We don't encourage too much such abbreviation. Same as below.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());
+                List<ExecutionSlotSharingGroup> executionSSGs = 
ssgToExecutionSSGs.get(ssg);
+                int index = getSlotRoundRobinIndex(entry.getValue().size(), 
ssg);
+
+                for (SchedulingExecutionVertex sev : entry.getValue()) {
+                    // For no CoLocations or with CoLocations with max 
parallelism of the SSG.
+                    if (!coLocationGroupMap.containsKey(entry.getKey())
+                            || ssgToExecutionSSGs.get(ssg).size() == 
entry.getValue().size()) {
+                        index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);

Review Comment:
   There is no need to update the index, do I understand it correctly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionToESsgMap(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, logicalSlotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /**
+         * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> 
ssgToExecutionSSGs;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotRoundRobinIndex;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> 
clcToESsgMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> logicalSlotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.clcToESsgMap = new HashMap<>();
+            this.ssgToExecutionSSGs = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotRoundRobinIndex = new 
HashMap<>(logicalSlotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) 
{
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            computeSsgToESsgMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> entry 
:
+                    allVertices.entrySet()) {
+                SlotSharingGroup ssg = slotSharingGroupMap.get(entry.getKey());
+                List<ExecutionSlotSharingGroup> executionSSGs = 
ssgToExecutionSSGs.get(ssg);
+                int index = getSlotRoundRobinIndex(entry.getValue().size(), 
ssg);
+
+                for (SchedulingExecutionVertex sev : entry.getValue()) {
+                    // For no CoLocations or with CoLocations with max 
parallelism of the SSG.
+                    if (!coLocationGroupMap.containsKey(entry.getKey())
+                            || ssgToExecutionSSGs.get(ssg).size() == 
entry.getValue().size()) {
+                        index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                        continue;
+                    }
+
+                    // For CoLocations with exited execution slot sharing 
group.
+                    final CoLocationConstraint clc = 
getCoLocationConstraint(entry.getKey(), sev);
+                    if (clcToESsgMap.get(clc) != null) {
+                        clcToESsgMap.get(clc).addVertex(sev.getId());
+                        executionSlotSharingGroupMap.put(sev.getId(), 
clcToESsgMap.get(clc));
+                        continue;
+                    }
+                    // For CoLocations without existed execution slot sharing 
group.
+                    index = findAvailableESSGForCoLocation(sev, executionSSGs);
+                    clcToESsgMap.put(clc, executionSSGs.get(index));
+                    index = 
addVertexToExecutionSlotSharingGroup(executionSSGs, index, sev);
+                }
+                updateSlotRoundRobinIndexIfNeeded(entry.getValue().size(), 
ssg, index);
+            }
+            return executionSlotSharingGroupMap;
+        }
+
+        private void computeSsgToESsgMap(
+                LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices) {
+
+            allVertices.entrySet().stream()
+                    .map(
+                            entry ->
+                                    new AbstractMap.SimpleEntry<>(
+                                            entry.getKey(), 
entry.getValue().size()))
+                    .collect(
+                            Collectors.groupingBy(entry -> 
slotSharingGroupMap.get(entry.getKey())))
+                    .forEach(
+                            (ssg, entries) -> {
+                                Integer slotNumOfSsg =
+                                        entries.stream()
+                                                .map(Map.Entry::getValue)
+                                                .max(Comparator.comparingInt(o 
-> o))
+                                                .orElseThrow(
+                                                        () ->
+                                                                new 
FlinkRuntimeException(
+                                                                        
String.format(
+                                                                               
 "Error state when computing the number of slots in %s",
+                                                                               
 ssg)));
+                                List<ExecutionSlotSharingGroup> eSsgs =
+                                        new ArrayList<>(slotNumOfSsg);
+                                for (int i = 0; i < slotNumOfSsg; i++) {
+                                    ExecutionSlotSharingGroup 
executionSlotSharingGroup =
+                                            new ExecutionSlotSharingGroup(ssg);
+                                    eSsgs.add(i, executionSlotSharingGroup);
+                                    LOG.debug(
+                                            "Set resourceProfile {} for {}th 
executionSlotSharingGroup {} in slotSharingGroup {}.",
+                                            ssg.getResourceProfile(),
+                                            i,
+                                            executionSlotSharingGroup,
+                                            ssg);
+                                }
+                                ssgToExecutionSSGs.put(ssg, eSsgs);
+                            });
+        }
+
+        /** Find the execution slot sharing group that holds the least 
execution tasks. */
+        private int findAvailableESSGForCoLocation(

Review Comment:
   It's nothing about the colocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to