Hey Jingsong,
I will try to use the patch to verify.
In the meantime, I have run the job with -D akka.ask.timeout and -D
akka.client.timeout, both equal to 600s.
But the stacktrace is the same :

org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result. (JobID: 6f0adc8c58263ef83cb770285094bcba)

at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:471)

at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)

at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)

at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1862)

at
ai.humn.RawDataDrivingStateTransitionAnalysis$.addAnalysisPipeline(RawDataDrivingStateTransitionAnalysis.scala:59)

at
ai.humn.RawDataDrivingStateTransitionAnalysis$.main(RawDataDrivingStateTransitionAnalysis.scala:42)

at
ai.humn.RawDataDrivingStateTransitionAnalysis.main(RawDataDrivingStateTransitionAnalysis.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)

at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)

at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.

at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)

at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)

at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:207)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)

at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., <Exception on server side:

akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#-1403343453]] after [10000 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)

at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)

at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)

at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)

at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)

at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)

at java.lang.Thread.run(Thread.java:748)


End of exception on server side>]

at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)

at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)

at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)


wt., 12 lis 2019 o 12:15 Jingsong Li <jingsongl...@gmail.com> napisał(a):

> Hi Dominik:
>
> I found a problem too that it maybe your root cause.[1]
> JobConf in HadoopInputSplit may very big, contains hundreds of
> configurations, if it is serialized by every split, that will significantly
> reduce performance. Consider thousands of splits, the akka thread of
> JobMaster will all on the serialization of conf. That may will lead to
> various akka timeouts too.
> So the reason of your job failure may be that the JobMaster is busy
> serializing the configuration.
> I have created a patch to solve it, you can take a look and try.[2]
>
> [1] https://issues.apache.org/jira/browse/FLINK-14722
> [2]
>
> https://github.com/JingsongLi/flink/commit/90c021ab8e7a175c6644c345e63383d828c415d7
>
> Best,
> Jingsong Lee
>
> On Tue, Nov 12, 2019 at 6:49 PM Dominik Wosiński <wos...@gmail.com> wrote:
>
> > Hey,
> > I have increased the `akka.client.timeout` but it has not helped at all.
> > Here is the log with callstack for AskTimeoutException:
> >
> > 019-11-12 10:19:17,425 INFO
> > org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> > Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink
> > Java Job at Tue Nov 12 10:04:19 UTC 2019).
> > 2019-11-12 10:19:17,425 INFO
> > org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> > Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue
> > Nov 12 10:04:19 UTC 2019).
> > 2019-11-12 10:19:17,444 INFO
> > org.apache.flink.runtime.rpc.akka.AkkaRpcService              -
> > Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster
> > at akka://flink/user/jobmanager_0 .
> > 2019-11-12 10:19:17,452 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  -
> > Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019
> > (81fbbc3f41ad5e08ac832d0e656478bc).
> > 2019-11-12 10:19:17,477 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  - Using
> > restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12
> > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> > 2019-11-12 10:19:17,495 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> > recovers via failover strategy: full graph restart
> > 2019-11-12 10:19:17,513 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  -
> > Running initialization on master for job Flink Java Job at Tue Nov 12
> > 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> > 2019-11-12 10:19:17,777 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster                  -
> > Successfully ran initialization on master in 264 ms.
> > 2019-11-12 10:19:27,442 ERROR
> > org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    -
> > Unhandled exception.
> > akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms].
> > Sender[null] sent message of type
> > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> >         at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> >         at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> >         at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> >         at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> >         at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> >         at
> >
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> >         at java.lang.Thread.run(Thread.java:748)
> >
> >
> > Thanks in Advance,
> >
> > Best Regards,
> >
> > Dom
> >
> >
> > wt., 12 lis 2019 o 03:26 tison <wander4...@gmail.com> napisał(a):
> >
> > > I suspect you suffer from Client submission failure which also throws
> > > AskTimeoutException.
> > >
> > > The related configure option are `akka.client.timeout` which you can
> > > increase. However, there
> > > was some cases you can resolve the problem by upgrading Java to latest
> > > minimum version 8u212
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Zhu Zhu <reed...@gmail.com> 于2019年11月11日周一 下午6:03写道:
> > >
> > >> Hi Dominik,
> > >>
> > >> Would you check whether the JM GC status?
> > >> One possible cause is that the large number of file metas
> > >> inHadoopInputFormat is burdening the JM memory.
> > >>
> > >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may
> > >> override
> > >> this timeout for their own purpose. e.g. the RPCs from web usually use
> > >> `web.timeout`.
> > >> Providing the detailed call stack of the AskTimeoutException may help
> to
> > >> identify where this timeout happened.
> > >>
> > >> Thanks,
> > >> Zhu Zhu
> > >>
> > >> Dominik Wosiński <wos...@gmail.com> 于2019年11月11日周一 上午3:35写道:
> > >>
> > >> > Hey,
> > >> > I have a very specific use case. I have a history of records stored
> as
> > >> > Parquet in S3. I would like to read and process them with Flink. The
> > >> issue
> > >> > is that the number of files is quite large ( >100k). If I provide
> the
> > >> full
> > >> > list of files to HadoopInputFormat that I am using it will fail with
> > >> > AskTimeoutException, which Is weird since I am using YARN and
> setting
> > >> the
> > >> > -yD akka.ask.timeout=600s, even thought according to the logs the
> > >> setting
> > >> > is processed properly, the job execution still with
> > AskTimeoutException
> > >> > after 10s, which seems weird to me. I have managed to go around
> this,
> > by
> > >> > grouping files and reading them in a loop, so that finally I have
> the
> > >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I
> > will
> > >> > receive the AskTimeoutException again. So my question is, what can
> be
> > >> the
> > >> > reason behind this exception being thrown and why is the setting
> > >> ignored,
> > >> > even if this is pared properly.
> > >> >
> > >> > I will be glad for any help.
> > >> >
> > >> > Best Regards,
> > >> > Dom.
> > >> >
> > >>
> > >
> >
>
>
> --
> Best, Jingsong Lee
>

Reply via email to