Fail to cancel perJob for that deregisterApplication is not called

2021-03-23 Thread 刘建刚
  I am using flink 1.10.0. My perJob can not be cancelled. From the log
I find that  webMonitorEndpoint.closeAsync() is completed but
deregisterApplication is not called. The related code is as follows:

public CompletableFuture deregisterApplicationAndClose(
  final ApplicationStatus applicationStatus,
  final @Nullable String diagnostics) {

   if (isRunning.compareAndSet(true, false)) {
  final CompletableFuture closeWebMonitorAndDeregisterAppFuture =
 FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
deregisterApplication(applicationStatus, diagnostics,
resourceManager.getJobId()));

  return 
FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture,
this::closeAsyncInternal);
   } else {
  return terminationFuture;
   }
}

  For webMonitorEndpoint.closeAsync(), the code is as follows:

public CompletableFuture closeAsync() {
   synchronized (lock) {
  log.info("State is {}. Shutting down rest endpoint.", state);

  if (state == State.RUNNING) {
 final CompletableFuture shutDownFuture =
FutureUtils.composeAfterwards(
closeHandlersAsync(),
this::shutDownInternal);

 shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
   log.info("Shut down complete.");
   if (throwable != null) {
  terminationFuture.completeExceptionally(throwable);
   } else {
  terminationFuture.complete(null);
   }
});
 state = State.SHUTDOWN;
  } else if (state == State.CREATED) {
 terminationFuture.complete(null);
 state = State.SHUTDOWN;
  }

  return terminationFuture;
   }
}

  I am sure that it is completed with the log I added as follows:
[image: image.png]

 For deregisterApplication, I do not see any related log like "Shut
down cluster because application is in {}, diagnostics {}.".
  Can anyone give me some suggestions? Thank you.


Re: Fail to cancel perJob for that deregisterApplication is not called

2021-03-26 Thread Chesnay Schepler

Where exactly did you add your own log message?

WebMonitorEndpoint.closeAsync() already logs on it's own whether the 
shutdown future was completed; meaning that it shouldn't have been 
necessary to add a separate log message.
If you now only see the one you added, chances are that it was added at 
the wrong place.


On 3/24/2021 5:06 AM, 刘建刚 wrote:
      I am using flink 1.10.0. My perJob can not be cancelled. From 
the log I find that webMonitorEndpoint.closeAsync() is completed but 
deregisterApplication is not called. The related code is as follows:

public CompletableFuturederegisterApplicationAndClose(
   final ApplicationStatus applicationStatus, final @Nullable String 
diagnostics) {

if (isRunning.compareAndSet(true, false)) {
   final CompletableFuture closeWebMonitorAndDeregisterAppFuture =
  FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
 deregisterApplication(applicationStatus, diagnostics, 
resourceManager.getJobId())); return 
FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, 
this::closeAsyncInternal); }else {
   return terminationFuture; }
}
      For webMonitorEndpoint.closeAsync(), the code is as follows:
public CompletableFuturecloseAsync() {
synchronized (lock) {
   log.info("State is {}. Shutting down rest endpoint.", state); if (state 
== State.RUNNING) {
  final CompletableFuture shutDownFuture = 
FutureUtils.composeAfterwards(
 closeHandlersAsync(), this::shutDownInternal); 
shutDownFuture.whenComplete(
 (Void ignored, Throwable throwable) -> {
log.info("Shut down complete."); if (throwable !=null) {
   terminationFuture.completeExceptionally(throwable); }else {
   terminationFuture.complete(null); }
 }); state = State.SHUTDOWN; }else if (state == State.CREATED) {
  terminationFuture.complete(null); state = State.SHUTDOWN; }

   return terminationFuture; }
}
      I am sure that it is completed with the log I added as follows:
image.png
     For deregisterApplication, I do not see any related log like 
"Shut down cluster because application is in {}, diagnostics {}.".

      Can anyone give me some suggestions? Thank you.





Re: Fail to cancel perJob for that deregisterApplication is not called

2021-05-21 Thread 刘建刚
Thank you for the answer.

I met the same problem again. I add log in RestServerEndpoint's closeAsync
method as following:

@Override
public CompletableFuture closeAsync() {
   synchronized (lock) {
  log.info("State is {}. Shutting down rest endpoint.", state);

  if (state == State.RUNNING) {
 final CompletableFuture shutDownFuture =
FutureUtils.composeAfterwards(
closeHandlersAsync(),
this::shutDownInternal);

 shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
   if (throwable != null) {
  terminationFuture.completeExceptionally(throwable);
   } else {
  terminationFuture.complete(null);
   }
   log.info("Shut down complete with {}.", terminationFuture);
});
 state = State.SHUTDOWN;
  } else if (state == State.CREATED) {
 terminationFuture.complete(null);
 state = State.SHUTDOWN;
  }

  return terminationFuture;
   }
}

After closeAsync, it is expected to
execute DispatcherResourceManagerComponent's deregisterApplication method
as following:

public CompletableFuture deregisterApplicationAndClose(
  final ApplicationStatus applicationStatus,
  final @Nullable String diagnostics) {

   if (isRunning.compareAndSet(true, false)) {
  final CompletableFuture closeWebMonitorAndDeregisterAppFuture =
 FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
deregisterApplication(applicationStatus, diagnostics,
resourceManager.getJobId()));

  return 
FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture,
this::closeAsyncInternal);
   } else {
  return terminationFuture;
   }
}

However, Resource's deregisterApplication method is not executed. I do not
know why. Any suggestions?




Chesnay Schepler [via Apache Flink User Mailing List archive.] <
ml+s2336050n42578...@n4.nabble.com> 于2021年3月26日周五 下午6:54写道:

> Where exactly did you add your own log message?
>
> WebMonitorEndpoint.closeAsync() already logs on it's own whether the
> shutdown future was completed; meaning that it shouldn't have been
> necessary to add a separate log message.
> If you now only see the one you added, chances are that it was added at
> the wrong place.
>
> On 3/24/2021 5:06 AM, 刘建刚 wrote:
>
>   I am using flink 1.10.0. My perJob can not be cancelled. From the
> log I find that  webMonitorEndpoint.closeAsync() is completed but
> deregisterApplication is not called. The related code is as follows:
>
> public CompletableFuture deregisterApplicationAndClose(
>   final ApplicationStatus applicationStatus,  final @Nullable String 
> diagnostics) {
>
>if (isRunning.compareAndSet(true, false)) {
>   final CompletableFuture closeWebMonitorAndDeregisterAppFuture =
>  FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
> deregisterApplication(applicationStatus, diagnostics, 
> resourceManager.getJobId()));  return 
> FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, 
> this::closeAsyncInternal);   } else {
>   return terminationFuture;   }
> }
>
>   For webMonitorEndpoint.closeAsync(), the code is as follows:
>
> public CompletableFuture closeAsync() {
>synchronized (lock) {
>   log.info("State is {}. Shutting down rest endpoint.", state);  if 
> (state == State.RUNNING) {
>  final CompletableFuture shutDownFuture = 
> FutureUtils.composeAfterwards(
> closeHandlersAsync(),this::shutDownInternal); 
> shutDownFuture.whenComplete(
> (Void ignored, Throwable throwable) -> {
>log.info("Shut down complete.");   if (throwable 
> != null) {
>   terminationFuture.completeExceptionally(throwable); 
>   } else {
>   terminationFuture.complete(null);   }
> }); state = State.SHUTDOWN;  } else if (state == 
> State.CREATED) {
>  terminationFuture.complete(null); state = State.SHUTDOWN;
>   }
>
>   return terminationFuture;   }
> }
>
>   I am sure that it is completed with the log I added as follows:
> [image: image.png]
>
>  For deregisterApplication, I do not see any related log like "Shut
> down cluster because application is in {}, diagnostics {}.".
>   Can anyone give me some suggestions? Thank you.
>
>
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fail-to