Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5087#discussion_r155227397
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
---
    @@ -208,27 +209,61 @@ public void setLocality(Locality locality) {
        // 
------------------------------------------------------------------------
     
        @Override
    -   public void releaseSlot() {
    +   public void releaseInstanceSlot() {
    +           releaseSlot();
    +   }
    +
    +   @Override
    +   public CompletableFuture<?> releaseSlot() {
                if (!isCanceled()) {
    +                   final CompletableFuture<?> terminationFuture;
     
    -                   // kill all tasks currently running in this slot
    -                   Execution exec = this.executedTask;
    -                   if (exec != null && !exec.isFinished()) {
    -                           exec.fail(new Exception("TaskManager was 
lost/killed: " + getTaskManagerLocation()));
    -                   }
    +                   if (payload != null) {
    +                           // trigger the failure of the slot payload
    +                           payload.fail(new FlinkException("TaskManager 
was lost/killed: " + getTaskManagerLocation()));
     
    -                   // release directly (if we are directly allocated),
    -                   // otherwise release through the parent shared slot
    -                   if (getParent() == null) {
    -                           // we have to give back the slot to the owning 
instance
    -                           if (markCancelled()) {
    -                                   getOwner().returnAllocatedSlot(this);
    -                           }
    +                           // wait for the termination of the payload 
before releasing the slot
    +                           terminationFuture = 
payload.getTerminalStateFuture();
                        } else {
    -                           // we have to ask our parent to dispose us
    -                           getParent().releaseChild(this);
    +                           terminationFuture = 
CompletableFuture.completedFuture(null);
                        }
    +
    +                   terminationFuture.whenComplete(
    +                           (Object ignored, Throwable throwable) -> {
    +                                   // release directly (if we are directly 
allocated),
    +                                   // otherwise release through the parent 
shared slot
    +                                   if (getParent() == null) {
    +                                           // we have to give back the 
slot to the owning instance
    +                                           if (markCancelled()) {
    --- End diff --
    
    If `markCancelled` returns `false`, `releaseFuture` will never be 
completed. Is that intended? 


---

Reply via email to