[ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115882#comment-17115882
 ] 

zhangminglei commented on FLINK-17921:
--------------------------------------

cc [~jgrier] Could you please take a look on this ? Thank you very much.

> RpcGlobalAggregateManager#updateGlobalAggregate would cause unexpected 
> akka.timeout
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-17921
>                 URL: https://issues.apache.org/jira/browse/FLINK-17921
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>    Affects Versions: 1.8.1, 1.10.1
>            Reporter: zhangminglei
>            Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
> would cause akka.timeout.  But that's not the message what we want.
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} 
> return {{null}} and used it in 
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause 
> following exception, this is not expected to happen from there. If we 
> increase the {{akka.ask.timeout}} to another value, the exception is still in 
> there. 
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that 
> {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} 
> will give the current thread to waiting, which will lead to the timeout of 
> the akka communication of Flink. Therefore, even if the timeout is 1 hour, 
> the problem cannot be solved. 
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007b76617a8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use 
> {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout 
> information can truly reflect the current status of the program, 
> {{akka.time.out}} error is too wide, which is not conducive to user 
> troubleshooting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to