xintongsong commented on a change in pull request #10682: 
[FLINK-15247][Runtime] Wait for all slots to be free before task executor 
services shutdown upon stopping
URL: https://github.com/apache/flink/pull/10682#discussion_r361275054
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 ##########
 @@ -403,54 +410,60 @@ public int freeSlot(AllocationID allocationId, Throwable 
cause) throws SlotNotFo
                TaskSlot taskSlot = getTaskSlot(allocationId);
 
                if (taskSlot != null) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Free slot {}.", taskSlot, cause);
-                       } else {
-                               LOG.info("Free slot {}.", taskSlot);
-                       }
+                       return freeSlot(taskSlot, cause);
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
 
-                       final JobID jobId = taskSlot.getJobId();
+       private int freeSlot(TaskSlot taskSlot, Throwable cause) {
+               AllocationID allocationId = taskSlot.getAllocationId();
 
-                       if (taskSlot.isEmpty()) {
-                               // remove the allocation id to task slot mapping
-                               allocatedSlots.remove(allocationId);
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Free slot {}.", taskSlot, cause);
+               } else {
+                       LOG.info("Free slot {}.", taskSlot);
+               }
 
-                               // unregister a potential timeout
-                               timerService.unregisterTimeout(allocationId);
+               final JobID jobId = taskSlot.getJobId();
 
-                               Set<AllocationID> slots = 
slotsPerJob.get(jobId);
+               if (taskSlot.isEmpty()) {
+                       // remove the allocation id to task slot mapping
+                       allocatedSlots.remove(allocationId);
 
-                               if (slots == null) {
-                                       throw new IllegalStateException("There 
are no more slots allocated for the job " + jobId +
-                                               ". This indicates a programming 
bug.");
-                               }
+                       // unregister a potential timeout
+                       timerService.unregisterTimeout(allocationId);
 
-                               slots.remove(allocationId);
+                       Set<AllocationID> slots = slotsPerJob.get(jobId);
 
-                               if (slots.isEmpty()) {
-                                       slotsPerJob.remove(jobId);
-                               }
+                       if (slots == null) {
+                               throw new IllegalStateException("There are no 
more slots allocated for the job " + jobId +
+                                       ". This indicates a programming bug.");
+                       }
 
-                               taskSlot.close();
-                               taskSlots.remove(taskSlot.getIndex());
-                               
budgetManager.release(taskSlot.getResourceProfile());
+                       slots.remove(allocationId);
 
-                               return taskSlot.getIndex();
-                       } else {
-                               // we couldn't free the task slot because it 
still contains task, fail the tasks
-                               // and set the slot state to releasing so that 
it gets eventually freed
-                               taskSlot.markReleasing();
+                       if (slots.isEmpty()) {
+                               slotsPerJob.remove(jobId);
+                       }
 
-                               Iterator<Task> taskIterator = 
taskSlot.getTasks();
+                       taskSlots.remove(taskSlot.getIndex());
+                       budgetManager.release(taskSlot.getResourceProfile());
+                       taskSlot.close();
 
 Review comment:
   Any special reason moving `taskSlot.close()` after `taskSlots.remove()` and 
`budgetManager.release`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to