1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1385855309


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    private @Nonnull SlotSharingGroup slotSharingGroup;
 
+    /** @deprecated Only for test classes. */
+    @Deprecated

Review Comment:
   The constructor should be marked as `VisibleForTesting` instead of Depreated 
if it's only for test.



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##########
@@ -708,6 +711,60 @@ public class TaskManagerOptions {
                             "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
                                     + " when the stream task is cancelled.");
 
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_TASK_MANAGER
+    })
+    public static final ConfigOption<TaskManagerLoadBalanceMode> 
TASK_MANAGER_LOAD_BALANCE_MODE =
+            ConfigOptions.key("taskmanager.load-balance.mode")
+                    .enumType(TaskManagerLoadBalanceMode.class)
+                    .defaultValue(TaskManagerLoadBalanceMode.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Mode for the load-balance 
allocation strategy across all available %s.",
+                                            code("TaskManagers"))
+                                    .list(
+                                            text(
+                                                    "The %s mode tries to 
spread out the slots evenly across all available %s.",
+                                                    
code(TaskManagerLoadBalanceMode.SLOTS.name()),
+                                                    code("TaskManagers")),
+                                            text(
+                                                    "The %s mode tries to 
ensure that the number of tasks is relative balanced across all available %s.",

Review Comment:
   ```suggestion
                                                       "The %s mode tries to 
spread out the tasks evenly across all available %s.",
   ```
   
   It's better to keep same with SLOTS.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java:
##########
@@ -84,4 +98,23 @@ public ExecutionSlotAllocator createInstance(final 
ExecutionSlotAllocationContex
                 allocationTimeout,
                 context::getResourceProfile);
     }
+
+    private static SlotSharingStrategy.Factory getSlotSharingStrategyFactory(
+            @Nonnull Configuration configuration) {
+
+        TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+                
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
+        if (taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.TASKS) {
+            return new TaskBalancedPreferredSlotSharingStrategy.Factory();
+        }
+        if (taskManagerLoadBalanceMode != TaskManagerLoadBalanceMode.NONE
+                && taskManagerLoadBalanceMode != 
TaskManagerLoadBalanceMode.SLOTS) {
+            LOG.warn(
+                    "The value '{}' of '{}' isn't supported now, it will 
rollback to default strategy '{}'.",
+                    taskManagerLoadBalanceMode,
+                    TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.key(),
+                    
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue());
+        }
+        return new LocalInputPreferredSlotSharingStrategy.Factory();

Review Comment:
   ```suggestion
           if (taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.NONE
                   || taskManagerLoadBalanceMode == 
TaskManagerLoadBalanceMode.SLOTS) {
                   return new LocalInputPreferredSlotSharingStrategy.Factory();
           }
           throw new UnsupportedXXXException("Unknown  
TaskManagerLoadBalanceMode");
   ```
   
   How about throw exception instead log.warn? If other developers add new enum 
in the future, the `exception` can let him know and he should change the logic 
to support the new enum properly.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest {
+
+    @Test
+    void testCoLocationConstraintIsRespected() {
+        TestingSchedulingTopology topology = new TestingSchedulingTopology();
+        List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> 
jobVertexInfos =
+                new ArrayList<>();
+        SlotSharingGroup ssg1 = new SlotSharingGroup();
+        CoLocationGroup clg1 = new CoLocationGroupImpl();
+        CoLocationGroup clg2 = new CoLocationGroupImpl();
+        List<MockedJobVertex> mockedJobVertices = getMockedJobVertices(ssg1, 
clg1, clg2);
+        setupCase(mockedJobVertices, topology, jobVertexInfos);
+
+        final SlotSharingStrategy strategy =
+                new TaskBalancedPreferredSlotSharingStrategy(
+                        topology, Sets.newHashSet(ssg1), Sets.newHashSet(clg1, 
clg2));
+        List<TestingSchedulingExecutionVertex> executionVertices1 = 
jobVertexInfos.get(1).f1;
+        List<TestingSchedulingExecutionVertex> executionVertices2 = 
jobVertexInfos.get(2).f1;
+        TestingSchedulingExecutionVertex executionVertexOfJv0 = 
jobVertexInfos.get(0).f1.get(0);
+        ExecutionSlotSharingGroup eSSGOfExecutionVertexOfJv0 =
+                
strategy.getExecutionSlotSharingGroup(executionVertexOfJv0.getId());
+
+        assertThat(executionVertices1).hasSameSizeAs(executionVertices2);
+        for (int i = 0; i < executionVertices1.size(); i++) {
+            ExecutionSlotSharingGroup eSSG =
+                    
strategy.getExecutionSlotSharingGroup(executionVertices1.get(i).getId());
+            assertThat(eSSG).isNotEqualTo(eSSGOfExecutionVertexOfJv0);
+            assertThat(eSSG)
+                    .isEqualTo(
+                            strategy.getExecutionSlotSharingGroup(
+                                    executionVertices2.get(i).getId()));
+        }
+
+        List<TestingSchedulingExecutionVertex> executionVertices3 = 
jobVertexInfos.get(3).f1;
+        List<TestingSchedulingExecutionVertex> executionVertices4 = 
jobVertexInfos.get(4).f1;
+        for (int i = 0; i < executionVertices1.size(); i++) {
+            
assertThat(strategy.getExecutionSlotSharingGroup(executionVertices3.get(i).getId()))
+                    .isEqualTo(
+                            strategy.getExecutionSlotSharingGroup(
+                                    executionVertices4.get(i).getId()));
+        }
+    }
+
+    @Nonnull
+    private static ArrayList<MockedJobVertex> getMockedJobVertices(
+            SlotSharingGroup ssg1, CoLocationGroup clg1, CoLocationGroup clg2) 
{
+        return Lists.newArrayList(
+                new MockedJobVertex(1, ssg1, null),
+                new MockedJobVertex(2, ssg1, clg1),
+                new MockedJobVertex(2, ssg1, clg1),
+                new MockedJobVertex(3, ssg1, clg2),
+                new MockedJobVertex(3, ssg1, clg2));
+    }
+
+    @Test
+    void testVerticesInDifferentSlotSharingGroups() {
+        TestingSchedulingTopology topology = new TestingSchedulingTopology();
+        List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> 
jobVertexInfos =
+                new ArrayList<>();
+        SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
+        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
+        List<MockedJobVertex> mockedJobVertices =
+                Lists.newArrayList(
+                        new MockedJobVertex(1, slotSharingGroup1, null),
+                        new MockedJobVertex(2, slotSharingGroup1, null),
+                        new MockedJobVertex(3, slotSharingGroup1, null),
+                        new MockedJobVertex(1, slotSharingGroup2, null),
+                        new MockedJobVertex(2, slotSharingGroup2, null),
+                        new MockedJobVertex(2, slotSharingGroup2, null));
+
+        setupCase(mockedJobVertices, topology, jobVertexInfos);
+
+        final SlotSharingStrategy strategy =
+                new TaskBalancedPreferredSlotSharingStrategy(
+                        topology,
+                        Sets.newHashSet(slotSharingGroup1, slotSharingGroup2),
+                        Collections.emptySet());
+        assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(5);
+        checkBalanceAtSlotsLevelWithoutCoLocation(strategy);
+
+        List<TestingSchedulingExecutionVertex> executionVertices4 = 
jobVertexInfos.get(4).f1;
+        List<TestingSchedulingExecutionVertex> executionVertices5 = 
jobVertexInfos.get(5).f1;
+        assertThat(executionVertices4).hasSameSizeAs(executionVertices5);
+        // Check for JVs whose parallelism is the max in the same SSG.
+        for (int i = 0; i < executionVertices4.size(); i++) {
+            TestingSchedulingExecutionVertex executionVertex4 = 
executionVertices4.get(i);
+            
assertThat(strategy.getExecutionSlotSharingGroup(executionVertex4.getId()))
+                    .isEqualTo(
+                            strategy.getExecutionSlotSharingGroup(
+                                    executionVertices5.get(i).getId()));
+        }
+    }
+
+    private static void 
checkBalanceAtSlotsLevelWithoutCoLocation(SlotSharingStrategy strategy) {
+        strategy.getExecutionSlotSharingGroups().stream()
+                
.collect(Collectors.groupingBy(ExecutionSlotSharingGroup::getSlotSharingGroup))
+                .forEach(
+                        (ssg, eSSGs) -> {
+                            Optional<Integer> max =
+                                    eSSGs.stream()
+                                            .map(eSsg -> 
eSsg.getExecutionVertexIds().size())
+                                            .max(Comparator.comparing(i -> i));
+                            Optional<Integer> min =
+                                    eSSGs.stream()
+                                            .map(eSsg -> 
eSsg.getExecutionVertexIds().size())
+                                            .min(Comparator.comparing(i -> i));
+                            assertThat(max.get()).isCloseTo(min.get(), 
Offset.offset(1));
+                        });
+    }
+
+    @Test
+    void testSetSlotSharingGroupResource() {
+        TestingSchedulingTopology topology = new TestingSchedulingTopology();
+        final JobVertexID jobVertexId1 = new JobVertexID();
+        final JobVertexID jobVertexId2 = new JobVertexID();
+        final TestingSchedulingExecutionVertex ev11 = 
topology.newExecutionVertex(jobVertexId1, 0);
+        final TestingSchedulingExecutionVertex ev12 = 
topology.newExecutionVertex(jobVertexId1, 1);
+        final TestingSchedulingExecutionVertex ev21 = 
topology.newExecutionVertex(jobVertexId2, 0);
+
+        final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
+        final ResourceProfile resourceProfile1 = 
ResourceProfile.fromResources(1, 10);
+        slotSharingGroup1.addVertexToGroup(jobVertexId1);
+        slotSharingGroup1.setResourceProfile(resourceProfile1);
+
+        final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
+        final ResourceProfile resourceProfile2 = 
ResourceProfile.fromResources(2, 20);
+        slotSharingGroup2.addVertexToGroup(jobVertexId2);
+        slotSharingGroup2.setResourceProfile(resourceProfile2);
+
+        final Set<SlotSharingGroup> slotSharingGroups =
+                Sets.newHashSet(slotSharingGroup1, slotSharingGroup2);
+
+        final SlotSharingStrategy strategy =
+                new LocalInputPreferredSlotSharingStrategy(
+                        topology, slotSharingGroups, Collections.emptySet());
+
+        assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(3);
+        
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getResourceProfile())
+                .isEqualTo(resourceProfile1);
+        
assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getResourceProfile())
+                .isEqualTo(resourceProfile1);
+        
assertThat(strategy.getExecutionSlotSharingGroup(ev21.getId()).getResourceProfile())
+                .isEqualTo(resourceProfile2);
+    }
+
+    private void setupCase(
+            List<MockedJobVertex> mockedJobVertices,
+            TestingSchedulingTopology topology,
+            List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> 
jobVertexInfos) {
+        for (MockedJobVertex mockedJobVertex : mockedJobVertices) {
+            List<TestingSchedulingExecutionVertex> tSEVs = new ArrayList<>();
+            for (int subIndex = 0; subIndex < mockedJobVertex.parallelism; 
subIndex++) {
+                
tSEVs.add(topology.newExecutionVertex(mockedJobVertex.jobVertex.getID(), 
subIndex));
+            }
+            jobVertexInfos.add(Tuple2.of(mockedJobVertex.jobVertex.getID(), 
tSEVs));
+        }
+    }
+
+    /** Util class to represent the simple job vertex information. */
+    static class MockedJobVertex {

Review Comment:
   In general, `MockedXxx` is the sub-class of `Xxx`.
   
   I suggest updating the Class name properly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    private @Nonnull SlotSharingGroup slotSharingGroup;

Review Comment:
   Could it be marked final? It's more safe.
   
   If so, setter can be removed, and pass it by your new constructor directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to