1996fanrui commented on code in PR #25218:
URL: https://github.com/apache/flink/pull/25218#discussion_r1759649975


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -18,18 +18,123 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
-/** Interface for assigning slots to slot sharing groups. */
+import static java.util.function.Function.identity;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+
+/** The Interface for assigning slots to slot sharing groups. */
 @Internal
 public interface SlotAssigner {
 
+    /**
+     * The helper class to represent the allocation score on the specified 
group and allocated slot.
+     */
+    class AllocationScore implements Comparable<AllocationScore> {
+
+        private final String groupId;
+        private final AllocationID allocationId;
+        private final long score;
+
+        public AllocationScore(String groupId, AllocationID allocationId, long 
score) {
+            this.groupId = groupId;
+            this.allocationId = allocationId;
+            this.score = score;
+        }
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public AllocationID getAllocationId() {
+            return allocationId;
+        }
+
+        public long getScore() {
+            return score;
+        }
+
+        @Override
+        public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {

Review Comment:
   `SlotAssigner` uses `StateLocalitySlotAssigner` is a little wired.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java:
##########
@@ -48,14 +50,30 @@ public Collection<SlotAssignment> assignSlots(
             
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
 
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        Iterator<? extends SlotInfo> iterator =
+                selectSlotsInMinimalTaskExecutors(freeSlots, allGroups, 
Collections.emptyList())
+                        .iterator();

Review Comment:
   I feel the current change is a little complex. IIUC, `DefaultSlotAssigner` 
only needs to change one line is enough.
   
   All changes of `SlotAssigner` are not needed. They could simplified into 
`StateLocalitySlotAssigner`.
   
   Please correct me if my understanding is wrong.
   
   ```suggestion
           Iterator<? extends SlotInfo> iterator = 
freeSlots.stream().sorted(Comparator.comparing(SlotInfo::getTaskManagerLocation)).iterator();
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java:
##########
@@ -18,18 +18,123 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
-/** Interface for assigning slots to slot sharing groups. */
+import static java.util.function.Function.identity;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+
+/** The Interface for assigning slots to slot sharing groups. */
 @Internal
 public interface SlotAssigner {
 
+    /**
+     * The helper class to represent the allocation score on the specified 
group and allocated slot.
+     */
+    class AllocationScore implements Comparable<AllocationScore> {
+
+        private final String groupId;
+        private final AllocationID allocationId;
+        private final long score;
+
+        public AllocationScore(String groupId, AllocationID allocationId, long 
score) {
+            this.groupId = groupId;
+            this.allocationId = allocationId;
+            this.score = score;
+        }
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public AllocationID getAllocationId() {
+            return allocationId;
+        }
+
+        public long getScore() {
+            return score;
+        }
+
+        @Override
+        public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+            int result = Long.compare(score, other.score);
+            if (result != 0) {
+                return result;
+            }
+            result = other.allocationId.compareTo(allocationId);
+            if (result != 0) {
+                return result;
+            }
+            return other.groupId.compareTo(groupId);
+        }
+    }
+
     Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism,
             JobAllocationsInformation previousAllocations);
+
+    /**
+     * Select the target slots to assign with the requested groups.
+     *
+     * @param slots the raw slots to filter.
+     * @param groups the request execution slot sharing groups.
+     * @param scores the allocation scores.
+     * @return the target slots that are distributed on the minimal task 
executors.
+     */
+    default Collection<? extends SlotInfo> selectSlotsInMinimalTaskExecutors(
+            Collection<? extends SlotInfo> slots,
+            Collection<ExecutionSlotSharingGroup> groups,
+            Collection<AllocationScore> scores) {
+        if (slots.size() - groups.size() <= 0) {
+            return slots;
+        }
+
+        List<TaskManagerLocation> orderedTaskExecutors =
+                sortPrioritizedTaskExecutors(slots, scores);
+        Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> 
slotsByTaskExecutor =
+                SlotAssigner.getSlotsPerTaskExecutor(slots);

Review Comment:
   SlotAssigner.getSlotsPerTaskExecutor is called inside of 
`sortPrioritizedTaskExecutors` as well. It's not needed to call multiple times.
   
   Anyway, this could be avoided if my first comment is suitable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to