[ 
https://issues.apache.org/jira/browse/FLINK-17921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei updated FLINK-17921:
---------------------------------
    Description: 
As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
would cause akka.timeout. 

 
If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} return 
{{null}} and used it in {{RpcGlobalAggregateManager#updateGlobalAggregate}} , 
this would cause following exception, this is not expected to happen from 
there. If we increase the {{akka.ask.timeout}} to another value, the exception 
is still in there. 

{code:java}
java.io.IOException: Error updating global aggregate.
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
... 8 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply.
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
{code}

The following stacktrace would describe the root cause. We can see that 
{{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} will 
give the current thread to waiting, which will lead to the timeout of the akka 
communication of Flink. Therefore, even if the timeout is 1 hour, the problem 
cannot be solved. 

{code:java}
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007b76617a8> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use 
{{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout 
information can truly reflect the current status of the program, 
{{akka.time.out}} error is too wide, which is not conducive to user 
troubleshooting.



  was:As described in summary, 
{{RpcGlobalAggregateManager#updateGlobalAggregate}} would cause akka.timeout. 


> RpcGlobalAggregateManager#updateGlobalAggregate would cause akka.timeout
> ------------------------------------------------------------------------
>
>                 Key: FLINK-17921
>                 URL: https://issues.apache.org/jira/browse/FLINK-17921
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: zhangminglei
>            Priority: Major
>
> As described in summary, {{RpcGlobalAggregateManager#updateGlobalAggregate}} 
> would cause akka.timeout. 
>  
> If {{org.apache.flink.api.common.functions.AggregateFunction#getResult}} 
> return {{null}} and used it in 
> {{RpcGlobalAggregateManager#updateGlobalAggregate}} , this would cause 
> following exception, this is not expected to happen from there. If we 
> increase the {{akka.ask.timeout}} to another value, the exception is still in 
> there. 
> {code:java}
> java.io.IOException: Error updating global aggregate.
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:47)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> ... 8 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/jobmanager_1#-1046052429]] after [10000 ms]. Message 
> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
> reason for `AskTimeoutException` is that the recipient actor didn't send a 
> reply.
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
> {code}
> The following stacktrace would describe the root cause. We can see that 
> {{CompletableFuture.waitingGet}}, it imply that the {{Completabilefuture}} 
> will give the current thread to waiting, which will lead to the timeout of 
> the akka communication of Flink. Therefore, even if the timeout is 1 hour, 
> the problem cannot be solved. 
> {code:java}
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007b76617a8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager.updateGlobalAggregate(RpcGlobalAggregateManager.java:45)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1.run(AbstractFetcher.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> We would make the {{RpcGlobalAggregateManager#updateGlobalAggregate}} use 
> {{get(long timeout, TimeUnit unit)}} is a good choose. In that, The timeout 
> information can truly reflect the current status of the program, 
> {{akka.time.out}} error is too wide, which is not conducive to user 
> troubleshooting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to