KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1383243917
########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ########## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = - evenlySpreadOutSlots + taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: What about tasks? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ########## @@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup { private final Set<ExecutionVertexID> executionVertexIds; - private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; + private @Nonnull SlotSharingGroup slotSharingGroup; + /** @deprecated Only for test classes. */ + @Deprecated Review Comment: `@VisibleForTesting` ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java: ########## @@ -260,10 +261,10 @@ public static SlotManagerConfiguration fromConfiguration( configuration.getBoolean( ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED); - boolean evenlySpreadOutSlots = - configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY); + TaskManagerLoadBalanceMode taskManagerLoadBalanceMode = + TaskManagerLoadBalanceMode.loadFromConfiguration(configuration); final SlotMatchingStrategy slotMatchingStrategy = - evenlySpreadOutSlots + taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: ditto. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java: ########## @@ -42,69 +43,23 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and * belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. * Co-location constraints will be respected. */ -class LocalInputPreferredSlotSharingStrategy +class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy implements SlotSharingStrategy, SchedulingTopologyListener { Review Comment: No need to implement them again. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java: ########## @@ -42,69 +43,23 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and * belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. * Co-location constraints will be respected. */ -class LocalInputPreferredSlotSharingStrategy +class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy implements SlotSharingStrategy, SchedulingTopologyListener { - private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap; - - private final Set<SlotSharingGroup> logicalSlotSharingGroups; - - private final Set<CoLocationGroup> coLocationGroups; + public static final Logger LOG = + LoggerFactory.getLogger(LocalInputPreferredSlotSharingStrategy.class); Review Comment: Where do we use it? ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java: ########## @@ -46,6 +47,12 @@ public class SlotSharingGroup implements java.io.Serializable { // -------------------------------------------------------------------------------------------- + public SlotSharingGroup() {} + + public SlotSharingGroup(ResourceProfile resourceProfile) { Review Comment: Seems we don't need to do that. Just instantiate a SlotSharingGroup and set the resource profile. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java: ########## @@ -46,6 +47,12 @@ public class SlotSharingGroup implements java.io.Serializable { // -------------------------------------------------------------------------------------------- + public SlotSharingGroup() {} + + public SlotSharingGroup(ResourceProfile resourceProfile) { Review Comment: Only visible for testing? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ########## @@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup { private final Set<ExecutionVertexID> executionVertexIds; - private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; + private @Nonnull SlotSharingGroup slotSharingGroup; + /** @deprecated Only for test classes. */ + @Deprecated ExecutionSlotSharingGroup() { this.executionVertexIds = new HashSet<>(); + this.slotSharingGroup = new SlotSharingGroup(); + } + + ExecutionSlotSharingGroup(@Nonnull SlotSharingGroup slotSharingGroup) { + this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup); + this.executionVertexIds = new HashSet<>(); } void addVertex(final ExecutionVertexID executionVertexId) { executionVertexIds.add(executionVertexId); } - void setResourceProfile(ResourceProfile resourceProfile) { - this.resourceProfile = Preconditions.checkNotNull(resourceProfile); + void setSlotSharingGroup(SlotSharingGroup slotSharingGroup) { + this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup); + } + + @Nonnull + SlotSharingGroup getSlotSharingGroup() { + return slotSharingGroup; Review Comment: They are all visible only for testing. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ########## @@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup { private final Set<ExecutionVertexID> executionVertexIds; - private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; + private @Nonnull SlotSharingGroup slotSharingGroup; + /** @deprecated Only for test classes. */ + @Deprecated Review Comment: BTW, I think there is no need to keep it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org