mxm commented on code in PR #21908: URL: https://github.com/apache/flink/pull/21908#discussion_r1103096757
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java: ########## @@ -133,7 +138,56 @@ public Optional<VertexParallelismWithSlotSharing> determineParallelism( return Optional.of(new VertexParallelismWithSlotSharing(allVertexParallelism, assignments)); } - private static Map<JobVertexID, Integer> determineParallelism( + /** + * Distributes free slots across the slot-sharing groups of the job. Slots are distributed as + * evenly as possible while taking the minimum parallelism of contained vertices into account. + */ + private static Map<SlotSharingGroupId, Integer> determineSlotsPerSharingGroup( + JobInformation jobInformation, int freeSlots) { + int numUnassignedSlots = freeSlots; + int numUnassignedSlotSharingGroups = jobInformation.getSlotSharingGroups().size(); + + final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism = new HashMap<>(); + + for (Tuple2<SlotSharingGroup, Integer> slotSharingGroup : + sortSlotSharingGroupsByUpperParallelism(jobInformation)) { + final int groupParallelism = + Math.min( + slotSharingGroup.f1, + numUnassignedSlots / numUnassignedSlotSharingGroups); Review Comment: That said, all of this is not set in stone. We can probably add a mode to the adaptive scheduler which disables downscaling in combination with pre-requesting the right amount of resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org