KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1383243917


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
                 SlotManagerUtils.generateDefaultSlotResourceProfile(
                         totalResourceProfile, numSlotsPerWorker);
         this.availableResourceMatchingStrategy =
-                evenlySpreadOutSlots
+                taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   What about tasks?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    private @Nonnull SlotSharingGroup slotSharingGroup;
 
+    /** @deprecated Only for test classes. */
+    @Deprecated

Review Comment:
   `@VisibleForTesting`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##########
@@ -260,10 +261,10 @@ public static SlotManagerConfiguration fromConfiguration(
                 configuration.getBoolean(
                         
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
 
-        boolean evenlySpreadOutSlots =
-                
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+        TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+                
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
         final SlotMatchingStrategy slotMatchingStrategy =
-                evenlySpreadOutSlots
+                taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   ditto.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##########
@@ -42,69 +43,23 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
         implements SlotSharingStrategy, SchedulingTopologyListener {

Review Comment:
   No need to implement them again.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##########
@@ -42,69 +43,23 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
         implements SlotSharingStrategy, SchedulingTopologyListener {
 
-    private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
executionSlotSharingGroupMap;
-
-    private final Set<SlotSharingGroup> logicalSlotSharingGroups;
-
-    private final Set<CoLocationGroup> coLocationGroups;
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(LocalInputPreferredSlotSharingStrategy.class);

Review Comment:
   Where do we use it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##########
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements 
java.io.Serializable {
 
     // 
--------------------------------------------------------------------------------------------
 
+    public SlotSharingGroup() {}
+
+    public SlotSharingGroup(ResourceProfile resourceProfile) {

Review Comment:
   Seems we don't need to do that. Just instantiate a SlotSharingGroup and set 
the resource profile.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##########
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements 
java.io.Serializable {
 
     // 
--------------------------------------------------------------------------------------------
 
+    public SlotSharingGroup() {}
+
+    public SlotSharingGroup(ResourceProfile resourceProfile) {

Review Comment:
   Only visible for testing?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    private @Nonnull SlotSharingGroup slotSharingGroup;
 
+    /** @deprecated Only for test classes. */
+    @Deprecated
     ExecutionSlotSharingGroup() {
         this.executionVertexIds = new HashSet<>();
+        this.slotSharingGroup = new SlotSharingGroup();
+    }
+
+    ExecutionSlotSharingGroup(@Nonnull SlotSharingGroup slotSharingGroup) {
+        this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup);
+        this.executionVertexIds = new HashSet<>();
     }
 
     void addVertex(final ExecutionVertexID executionVertexId) {
         executionVertexIds.add(executionVertexId);
     }
 
-    void setResourceProfile(ResourceProfile resourceProfile) {
-        this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+    void setSlotSharingGroup(SlotSharingGroup slotSharingGroup) {
+        this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup);
+    }
+
+    @Nonnull
+    SlotSharingGroup getSlotSharingGroup() {
+        return slotSharingGroup;

Review Comment:
   They are all visible only for testing.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    private @Nonnull SlotSharingGroup slotSharingGroup;
 
+    /** @deprecated Only for test classes. */
+    @Deprecated

Review Comment:
   BTW, I think there is no need to keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to