[ 
https://issues.apache.org/jira/browse/FLINK-18663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166125#comment-17166125
 ] 

Till Rohrmann edited comment on FLINK-18663 at 7/28/20, 8:37 AM:
-----------------------------------------------------------------

[~trohrmann]  the JobManager's log is too large,so I remove some yarn and 
container’s log.

{{AbstractHandler.terminationFuture}} has not complete, because 
AbstractHandler.inFlightRequestTracker Cannot be cleared.

[^jobmanager.log.noyarn.tar.gz]

I add the log you tell me , you can filter '#####' ,  the {{log.info("Shutting 
RestServerEndpoint down internally")}} dosen't happen ,because 
{{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close yet.

and {{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close 
because {{AbstractHandler.inFlightRequestTracker}} not cleared, because the 
exception on job from SCHEDULED to DEPLOYING
{code:java}
2020-07-27 21:57:26,685 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
  - ##### handle exception for url /jobs/overview
2020-07-27 21:57:26,685 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
  - ##### handle exception for url /jobs/overview
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#-88418157]] after [10000 ms]. Message of 
type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply. 
  at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) 
  at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) 
  at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) 
  at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) 
  at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 
  at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
  at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
  at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
 
  at java.lang.Thread.run(Thread.java:745)
2020-07-27 21:57:26,686 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
[flink-akka.actor.default-dispatcher-48]  - 1-1.1_Sink: Unnamed (533/1500) 
(1b0945713f48026b5c677a2d1559f78f) switched from SCHEDULED to DEPLOYING.
2020-07-27 21:57:26,686 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
  - ##### handleException happened exception
java.lang.NullPointerException 
  at 
org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:204)
 
  at 
org.apache.flink.runtime.rest.handler.AbstractHandler.lambda$respondAsLeader$1(AbstractHandler.java:182)
 
  at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 
  at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 
  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
  at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 
  at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:872)
 
  at akka.dispatch.OnComplete.internal(Future.scala:263) 
  at akka.dispatch.OnComplete.internal(Future.scala:261) 
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) 
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) 
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 
  at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 
  at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 
  at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 
  at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) 
  at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) 
  at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 
  at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
  at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
  at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
 at java.lang.Thread.run(Thread.java:745)
{code}
I am not very clear why {{FlinkHttpObjectAggregator}} was null,but,  
{{FlinkHttpObjectAggregator here}} is only used to obtain maxContentLength, so 
why not pass {{RestHandlerConfiguration}} or 
{{RestServerEndpointConfiguration}} as construction parameters to 
AbstractHandler, then we can obtain some other config too, I feel more flexible 
like this.

But this is very redundant, after all, most of the parameters are not used.

 


was (Author: tartarus):
[~trohrmann]  the JobManager's log is too large,so I remove some yarn and 
container’s log.

{{AbstractHandler.terminationFuture}} has not complete, because 
AbstractHandler.inFlightRequestTracker Cannot be cleared.

[^jobmanager.log.noyarn.tar.gz]

I add the log you tell me , you can filter '#####' ,  the {{log.info("Shutting 
RestServerEndpoint down internally")}} dosen't happen ,because 
{{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close yet.

and {{org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler}} not close 
because {{AbstractHandler.inFlightRequestTracker}} not cleared, because the 
exception on job from SCHEDULED to DEPLOYING
{code:java}
2020-07-27 21:57:26,685 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
  - ##### handle exception for url /jobs/overview
2020-07-27 21:57:26,685 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
  - ##### handle exception for url /jobs/overview
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#-88418157]] after [10000 ms]. Message of 
type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply. 
  at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) 
  at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) 
  at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) 
  at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) 
  at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 
  at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
  at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
  at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
 
  at java.lang.Thread.run(Thread.java:745)
2020-07-27 21:57:26,686 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
[flink-akka.actor.default-dispatcher-48]  - 1-1.1_Sink: Unnamed (533/1500) 
(1b0945713f48026b5c677a2d1559f78f) switched from SCHEDULED to DEPLOYING.
2020-07-27 21:57:26,686 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler[flink-scheduler-1]
  - ##### handleException happened exceptionjava.lang.NullPointerException 
  at 
org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:204)
 
  at 
org.apache.flink.runtime.rest.handler.AbstractHandler.lambda$respondAsLeader$1(AbstractHandler.java:182)
 
  at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 
  at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 
  at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
  at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 
  at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:872)
 
  at akka.dispatch.OnComplete.internal(Future.scala:263) 
  at akka.dispatch.OnComplete.internal(Future.scala:261) 
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) 
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) 
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 
  at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 
  at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 
  at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 
  at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) 
  at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) 
  at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 
  at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
  at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
  at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
 
  at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
 at java.lang.Thread.run(Thread.java:745)
{code}
I am not very clear why {{FlinkHttpObjectAggregator}} was null,but,  
{{FlinkHttpObjectAggregator here}} is only used to obtain maxContentLength, so 
why not pass {{RestHandlerConfiguration}} or 
{{RestServerEndpointConfiguration}} as construction parameters to 
AbstractHandler, then we can obtain some other config too, I feel more flexible 
like this.

But this is very redundant, after all, most of the parameters are not used.

 

> Fix Flink On YARN AM not exit
> -----------------------------
>
>                 Key: FLINK-18663
>                 URL: https://issues.apache.org/jira/browse/FLINK-18663
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / REST
>    Affects Versions: 1.10.0, 1.10.1, 1.11.0
>            Reporter: tartarus
>            Assignee: tartarus
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: 110.png, 111.png, 
> C49A7310-F932-451B-A203-6D17F3140C0D.png, 
> e18e00dd6664485c2ff55284fe969474.png, jobmanager.log.noyarn.tar.gz
>
>
> AbstractHandler throw NPE cause by FlinkHttpObjectAggregator is null
> when rest throw exception, it will do this code
> {code:java}
> private CompletableFuture<Void> handleException(Throwable throwable, 
> ChannelHandlerContext ctx, HttpRequest httpRequest) {
>       FlinkHttpObjectAggregator flinkHttpObjectAggregator = 
> ctx.pipeline().get(FlinkHttpObjectAggregator.class);
>       int maxLength = flinkHttpObjectAggregator.maxContentLength() - 
> OTHER_RESP_PAYLOAD_OVERHEAD;
>       if (throwable instanceof RestHandlerException) {
>               RestHandlerException rhe = (RestHandlerException) throwable;
>               String stackTrace = ExceptionUtils.stringifyException(rhe);
>               String truncatedStackTrace = Ascii.truncate(stackTrace, 
> maxLength, "...");
>               if (log.isDebugEnabled()) {
>                       log.error("Exception occurred in REST handler.", rhe);
>               } else {
>                       log.error("Exception occurred in REST handler: {}", 
> rhe.getMessage());
>               }
>               return HandlerUtils.sendErrorResponse(
>                       ctx,
>                       httpRequest,
>                       new ErrorResponseBody(truncatedStackTrace),
>                       rhe.getHttpResponseStatus(),
>                       responseHeaders);
>       } else {
>               log.error("Unhandled exception.", throwable);
>               String stackTrace = String.format("<Exception on server 
> side:%n%s%nEnd of exception on server side>",
>                       ExceptionUtils.stringifyException(throwable));
>               String truncatedStackTrace = Ascii.truncate(stackTrace, 
> maxLength, "...");
>               return HandlerUtils.sendErrorResponse(
>                       ctx,
>                       httpRequest,
>                       new ErrorResponseBody(Arrays.asList("Internal server 
> error.", truncatedStackTrace)),
>                       HttpResponseStatus.INTERNAL_SERVER_ERROR,
>                       responseHeaders);
>       }
> }
> {code}
> but flinkHttpObjectAggregator some case is null,so this will throw NPE,but 
> this method called by  AbstractHandler#respondAsLeader
> {code:java}
> requestProcessingFuture
>       .whenComplete((Void ignored, Throwable throwable) -> {
>               if (throwable != null) {
>                       
> handleException(ExceptionUtils.stripCompletionException(throwable), ctx, 
> httpRequest)
>                               .whenComplete((Void ignored2, Throwable 
> throwable2) -> finalizeRequestProcessing(finalUploadedFiles));
>               } else {
>                       finalizeRequestProcessing(finalUploadedFiles);
>               }
>       });
> {code}
>  the result is InFlightRequestTracker Cannot be cleared.
> so the CompletableFuture does‘t complete that handler's closeAsync returned
> !C49A7310-F932-451B-A203-6D17F3140C0D.png!
> !e18e00dd6664485c2ff55284fe969474.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to