xintongsong commented on a change in pull request #10682: 
[FLINK-15247][Runtime] Wait for all slots to be free before task executor 
services shutdown upon stopping
URL: https://github.com/apache/flink/pull/10682#discussion_r361274776
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 ##########
 @@ -403,54 +410,60 @@ public int freeSlot(AllocationID allocationId, Throwable 
cause) throws SlotNotFo
                TaskSlot taskSlot = getTaskSlot(allocationId);
 
                if (taskSlot != null) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Free slot {}.", taskSlot, cause);
-                       } else {
-                               LOG.info("Free slot {}.", taskSlot);
-                       }
+                       return freeSlot(taskSlot, cause);
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
 
-                       final JobID jobId = taskSlot.getJobId();
+       private int freeSlot(TaskSlot taskSlot, Throwable cause) {
+               AllocationID allocationId = taskSlot.getAllocationId();
 
-                       if (taskSlot.isEmpty()) {
-                               // remove the allocation id to task slot mapping
-                               allocatedSlots.remove(allocationId);
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Free slot {}.", taskSlot, cause);
+               } else {
+                       LOG.info("Free slot {}.", taskSlot);
+               }
 
-                               // unregister a potential timeout
-                               timerService.unregisterTimeout(allocationId);
+               final JobID jobId = taskSlot.getJobId();
 
-                               Set<AllocationID> slots = 
slotsPerJob.get(jobId);
+               if (taskSlot.isEmpty()) {
+                       // remove the allocation id to task slot mapping
+                       allocatedSlots.remove(allocationId);
 
-                               if (slots == null) {
-                                       throw new IllegalStateException("There 
are no more slots allocated for the job " + jobId +
-                                               ". This indicates a programming 
bug.");
-                               }
+                       // unregister a potential timeout
+                       timerService.unregisterTimeout(allocationId);
 
-                               slots.remove(allocationId);
+                       Set<AllocationID> slots = slotsPerJob.get(jobId);
 
-                               if (slots.isEmpty()) {
-                                       slotsPerJob.remove(jobId);
-                               }
+                       if (slots == null) {
+                               throw new IllegalStateException("There are no 
more slots allocated for the job " + jobId +
+                                       ". This indicates a programming bug.");
+                       }
 
-                               taskSlot.close();
-                               taskSlots.remove(taskSlot.getIndex());
-                               
budgetManager.release(taskSlot.getResourceProfile());
+                       slots.remove(allocationId);
 
-                               return taskSlot.getIndex();
-                       } else {
-                               // we couldn't free the task slot because it 
still contains task, fail the tasks
-                               // and set the slot state to releasing so that 
it gets eventually freed
-                               taskSlot.markReleasing();
+                       if (slots.isEmpty()) {
+                               slotsPerJob.remove(jobId);
+                       }
 
-                               Iterator<Task> taskIterator = 
taskSlot.getTasks();
+                       taskSlots.remove(taskSlot.getIndex());
+                       budgetManager.release(taskSlot.getResourceProfile());
+                       taskSlot.close();
 
-                               while (taskIterator.hasNext()) {
-                                       
taskIterator.next().failExternally(cause);
-                               }
+                       return taskSlot.getIndex();
+               } else {
+                       // we couldn't free the task slot because it still 
contains task, fail the tasks
+                       // and set the slot state to releasing so that it gets 
eventually freed
+                       taskSlot.markReleasing();
+
+                       Iterator<Task> taskIterator = taskSlot.getTasks();
 
-                               return -1;
+                       while (taskIterator.hasNext()) {
+                               taskIterator.next().failExternally(cause);
                        }
-               } else {
-                       throw new SlotNotFoundException(allocationId);
+
+                       return -1;
                }
        }
 
 Review comment:
   Minor: This seems to be a irrelevant refactoring to me, which would be 
better in a separated hotfix commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to