zentol commented on code in PR #21981:
URL: https://github.com/apache/flink/pull/21981#discussion_r1116723072


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -157,13 +165,21 @@ public Map<AllocationID, Integer> calculateScore(
                                     .getMaxParallelism(),
                             parallelism.get(evi.getJobVertexId()),
                             evi.getSubtaskIndex());
+            // Estimate state size per key group. For scoring, assume 1 if 
size estimate is 0 to
+            // accommodate for averaging non-zero states
+            Optional<Long> kgSizeMaybe =
+                    stateSizeEstimates.estimate(evi.getJobVertexId()).map(e -> 
Math.max(e, 1L));
+            if (!kgSizeMaybe.isPresent()) {
+                continue;
+            }

Review Comment:
   This means it's not a stateful operator and couldn't make use of local 
recovery anyway?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java:
##########
@@ -116,66 +116,53 @@ public Collection<SlotAssignment> assignSlots(
             JobInformation jobInformation,
             Collection<? extends SlotInfo> freeSlots,
             VertexParallelism vertexParallelism) {
-        Collection<? extends SlotInfo> remainingSlots = freeSlots;
-        final Collection<SlotAssignment> assignments = new ArrayList<>();
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
         for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
-
-            List<ExecutionSlotSharingGroup> sharedSlotToVertexAssignment =
-                    createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup);
-
-            SlotAssigner.AssignmentResult result =
-                    assignSlots(remainingSlots, sharedSlotToVertexAssignment);
-            remainingSlots = result.remainingSlots;
-            assignments.addAll(result.assignments);
+            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
         }
-        return assignments;
-    }
-
-    private AssignmentResult assignSlots(

Review Comment:
   This method has now become rather long. There are some simple ways to reduce 
this, like moving all of this into a separate method:
   ```
   scores = new PriorityQueue<>(Comparator.reverseOrder());
           for (ExecutionSlotSharingGroup group : allGroups) {
               calculateScore(group, parallelism)
                       .forEach(
                               (allocationId, score) ->
                                       scores.add(
                                               new AllocationScore(
                                                       group.getId(), 
allocationId, score)));
           }
   ```
   same for
   ```
   final Map<JobVertexID, Integer> parallelism = new HashMap<>();
           allGroups.forEach(
                   group ->
                           group.getContainedExecutionVertices()
                                   .forEach(
                                           evi ->
                                                   parallelism.merge(
                                                           
evi.getJobVertexId(), 1, Integer::sum)));
   ```
   
   This is just boring setup work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to