Hey Jingsong, I will try to use the patch to verify. In the meantime, I have run the job with -D akka.ask.timeout and -D akka.client.timeout, both equal to 600s. But the stacktrace is the same :
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 6f0adc8c58263ef83cb770285094bcba) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) at org.apache.flink.api.java.DataSet.print(DataSet.java:1652) at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1862) at ai.humn.RawDataDrivingStateTransitionAnalysis$.addAnalysisPipeline(RawDataDrivingStateTransitionAnalysis.scala:59) at ai.humn.RawDataDrivingStateTransitionAnalysis$.main(RawDataDrivingStateTransitionAnalysis.scala:42) at ai.humn.RawDataDrivingStateTransitionAnalysis.main(RawDataDrivingStateTransitionAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1403343453]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) wt., 12 lis 2019 o 12:15 Jingsong Li <jingsongl...@gmail.com> napisał(a): > Hi Dominik: > > I found a problem too that it maybe your root cause.[1] > JobConf in HadoopInputSplit may very big, contains hundreds of > configurations, if it is serialized by every split, that will significantly > reduce performance. Consider thousands of splits, the akka thread of > JobMaster will all on the serialization of conf. That may will lead to > various akka timeouts too. > So the reason of your job failure may be that the JobMaster is busy > serializing the configuration. > I have created a patch to solve it, you can take a look and try.[2] > > [1] https://issues.apache.org/jira/browse/FLINK-14722 > [2] > > https://github.com/JingsongLi/flink/commit/90c021ab8e7a175c6644c345e63383d828c415d7 > > Best, > Jingsong Lee > > On Tue, Nov 12, 2019 at 6:49 PM Dominik Wosiński <wos...@gmail.com> wrote: > > > Hey, > > I have increased the `akka.client.timeout` but it has not helped at all. > > Here is the log with callstack for AskTimeoutException: > > > > 019-11-12 10:19:17,425 INFO > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - > > Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink > > Java Job at Tue Nov 12 10:04:19 UTC 2019). > > 2019-11-12 10:19:17,425 INFO > > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - > > Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue > > Nov 12 10:04:19 UTC 2019). > > 2019-11-12 10:19:17,444 INFO > > org.apache.flink.runtime.rpc.akka.AkkaRpcService - > > Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster > > at akka://flink/user/jobmanager_0 . > > 2019-11-12 10:19:17,452 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - > > Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019 > > (81fbbc3f41ad5e08ac832d0e656478bc). > > 2019-11-12 10:19:17,477 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - Using > > restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12 > > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). > > 2019-11-12 10:19:17,495 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > > recovers via failover strategy: full graph restart > > 2019-11-12 10:19:17,513 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - > > Running initialization on master for job Flink Java Job at Tue Nov 12 > > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc). > > 2019-11-12 10:19:17,777 INFO > > org.apache.flink.runtime.jobmaster.JobMaster - > > Successfully ran initialization on master in 264 ms. > > 2019-11-12 10:19:27,442 ERROR > > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler - > > Unhandled exception. > > akka.pattern.AskTimeoutException: Ask timed out on > > [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms]. > > Sender[null] sent message of type > > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > > at > > > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > > at > > > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > > at > > > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > > at > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > > at > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > > at > > > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > > at > > > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > > at > > > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > > at java.lang.Thread.run(Thread.java:748) > > > > > > Thanks in Advance, > > > > Best Regards, > > > > Dom > > > > > > wt., 12 lis 2019 o 03:26 tison <wander4...@gmail.com> napisał(a): > > > > > I suspect you suffer from Client submission failure which also throws > > > AskTimeoutException. > > > > > > The related configure option are `akka.client.timeout` which you can > > > increase. However, there > > > was some cases you can resolve the problem by upgrading Java to latest > > > minimum version 8u212 > > > > > > Best, > > > tison. > > > > > > > > > Zhu Zhu <reed...@gmail.com> 于2019年11月11日周一 下午6:03写道: > > > > > >> Hi Dominik, > > >> > > >> Would you check whether the JM GC status? > > >> One possible cause is that the large number of file metas > > >> inHadoopInputFormat is burdening the JM memory. > > >> > > >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may > > >> override > > >> this timeout for their own purpose. e.g. the RPCs from web usually use > > >> `web.timeout`. > > >> Providing the detailed call stack of the AskTimeoutException may help > to > > >> identify where this timeout happened. > > >> > > >> Thanks, > > >> Zhu Zhu > > >> > > >> Dominik Wosiński <wos...@gmail.com> 于2019年11月11日周一 上午3:35写道: > > >> > > >> > Hey, > > >> > I have a very specific use case. I have a history of records stored > as > > >> > Parquet in S3. I would like to read and process them with Flink. The > > >> issue > > >> > is that the number of files is quite large ( >100k). If I provide > the > > >> full > > >> > list of files to HadoopInputFormat that I am using it will fail with > > >> > AskTimeoutException, which Is weird since I am using YARN and > setting > > >> the > > >> > -yD akka.ask.timeout=600s, even thought according to the logs the > > >> setting > > >> > is processed properly, the job execution still with > > AskTimeoutException > > >> > after 10s, which seems weird to me. I have managed to go around > this, > > by > > >> > grouping files and reading them in a loop, so that finally I have > the > > >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I > > will > > >> > receive the AskTimeoutException again. So my question is, what can > be > > >> the > > >> > reason behind this exception being thrown and why is the setting > > >> ignored, > > >> > even if this is pared properly. > > >> > > > >> > I will be glad for any help. > > >> > > > >> > Best Regards, > > >> > Dom. > > >> > > > >> > > > > > > > > -- > Best, Jingsong Lee >