xintongsong commented on a change in pull request #10682: [FLINK-15247][Runtime] Wait for all slots to be free before task executor services shutdown upon stopping URL: https://github.com/apache/flink/pull/10682#discussion_r361274776
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ########## @@ -403,54 +410,60 @@ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFo TaskSlot taskSlot = getTaskSlot(allocationId); if (taskSlot != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Free slot {}.", taskSlot, cause); - } else { - LOG.info("Free slot {}.", taskSlot); - } + return freeSlot(taskSlot, cause); + } else { + throw new SlotNotFoundException(allocationId); + } + } - final JobID jobId = taskSlot.getJobId(); + private int freeSlot(TaskSlot taskSlot, Throwable cause) { + AllocationID allocationId = taskSlot.getAllocationId(); - if (taskSlot.isEmpty()) { - // remove the allocation id to task slot mapping - allocatedSlots.remove(allocationId); + if (LOG.isDebugEnabled()) { + LOG.debug("Free slot {}.", taskSlot, cause); + } else { + LOG.info("Free slot {}.", taskSlot); + } - // unregister a potential timeout - timerService.unregisterTimeout(allocationId); + final JobID jobId = taskSlot.getJobId(); - Set<AllocationID> slots = slotsPerJob.get(jobId); + if (taskSlot.isEmpty()) { + // remove the allocation id to task slot mapping + allocatedSlots.remove(allocationId); - if (slots == null) { - throw new IllegalStateException("There are no more slots allocated for the job " + jobId + - ". This indicates a programming bug."); - } + // unregister a potential timeout + timerService.unregisterTimeout(allocationId); - slots.remove(allocationId); + Set<AllocationID> slots = slotsPerJob.get(jobId); - if (slots.isEmpty()) { - slotsPerJob.remove(jobId); - } + if (slots == null) { + throw new IllegalStateException("There are no more slots allocated for the job " + jobId + + ". This indicates a programming bug."); + } - taskSlot.close(); - taskSlots.remove(taskSlot.getIndex()); - budgetManager.release(taskSlot.getResourceProfile()); + slots.remove(allocationId); - return taskSlot.getIndex(); - } else { - // we couldn't free the task slot because it still contains task, fail the tasks - // and set the slot state to releasing so that it gets eventually freed - taskSlot.markReleasing(); + if (slots.isEmpty()) { + slotsPerJob.remove(jobId); + } - Iterator<Task> taskIterator = taskSlot.getTasks(); + taskSlots.remove(taskSlot.getIndex()); + budgetManager.release(taskSlot.getResourceProfile()); + taskSlot.close(); - while (taskIterator.hasNext()) { - taskIterator.next().failExternally(cause); - } + return taskSlot.getIndex(); + } else { + // we couldn't free the task slot because it still contains task, fail the tasks + // and set the slot state to releasing so that it gets eventually freed + taskSlot.markReleasing(); + + Iterator<Task> taskIterator = taskSlot.getTasks(); - return -1; + while (taskIterator.hasNext()) { + taskIterator.next().failExternally(cause); } - } else { - throw new SlotNotFoundException(allocationId); + + return -1; } } Review comment: Minor: This seems to be a irrelevant refactoring to me, which would be better in a separated hotfix commit. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services