Hi Dominik:

I found a problem too that it maybe your root cause.[1]
JobConf in HadoopInputSplit may very big, contains hundreds of
configurations, if it is serialized by every split, that will significantly
reduce performance. Consider thousands of splits, the akka thread of
JobMaster will all on the serialization of conf. That may will lead to
various akka timeouts too.
So the reason of your job failure may be that the JobMaster is busy
serializing the configuration.
I have created a patch to solve it, you can take a look and try.[2]

[1] https://issues.apache.org/jira/browse/FLINK-14722
[2]
https://github.com/JingsongLi/flink/commit/90c021ab8e7a175c6644c345e63383d828c415d7

Best,
Jingsong Lee

On Tue, Nov 12, 2019 at 6:49 PM Dominik Wosiński <wos...@gmail.com> wrote:

> Hey,
> I have increased the `akka.client.timeout` but it has not helped at all.
> Here is the log with callstack for AskTimeoutException:
>
> 019-11-12 10:19:17,425 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink
> Java Job at Tue Nov 12 10:04:19 UTC 2019).
> 2019-11-12 10:19:17,425 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      -
> Submitting job 81fbbc3f41ad5e08ac832d0e656478bc (Flink Java Job at Tue
> Nov 12 10:04:19 UTC 2019).
> 2019-11-12 10:19:17,444 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService              -
> Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster
> at akka://flink/user/jobmanager_0 .
> 2019-11-12 10:19:17,452 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Initializing job Flink Java Job at Tue Nov 12 10:04:19 UTC 2019
> (81fbbc3f41ad5e08ac832d0e656478bc).
> 2019-11-12 10:19:17,477 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  - Using
> restart strategy NoRestartStrategy for Flink Java Job at Tue Nov 12
> 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> 2019-11-12 10:19:17,495 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> recovers via failover strategy: full graph restart
> 2019-11-12 10:19:17,513 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Running initialization on master for job Flink Java Job at Tue Nov 12
> 10:04:19 UTC 2019 (81fbbc3f41ad5e08ac832d0e656478bc).
> 2019-11-12 10:19:17,777 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Successfully ran initialization on master in 264 ms.
> 2019-11-12 10:19:27,442 ERROR
> org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    -
> Unhandled exception.
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-1507147991]] after [10000 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>         at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>         at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>         at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>         at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>         at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>         at java.lang.Thread.run(Thread.java:748)
>
>
> Thanks in Advance,
>
> Best Regards,
>
> Dom
>
>
> wt., 12 lis 2019 o 03:26 tison <wander4...@gmail.com> napisał(a):
>
> > I suspect you suffer from Client submission failure which also throws
> > AskTimeoutException.
> >
> > The related configure option are `akka.client.timeout` which you can
> > increase. However, there
> > was some cases you can resolve the problem by upgrading Java to latest
> > minimum version 8u212
> >
> > Best,
> > tison.
> >
> >
> > Zhu Zhu <reed...@gmail.com> 于2019年11月11日周一 下午6:03写道:
> >
> >> Hi Dominik,
> >>
> >> Would you check whether the JM GC status?
> >> One possible cause is that the large number of file metas
> >> inHadoopInputFormat is burdening the JM memory.
> >>
> >> `akka.ask.timeout` is the default RPC timeout, while some RPCs may
> >> override
> >> this timeout for their own purpose. e.g. the RPCs from web usually use
> >> `web.timeout`.
> >> Providing the detailed call stack of the AskTimeoutException may help to
> >> identify where this timeout happened.
> >>
> >> Thanks,
> >> Zhu Zhu
> >>
> >> Dominik Wosiński <wos...@gmail.com> 于2019年11月11日周一 上午3:35写道:
> >>
> >> > Hey,
> >> > I have a very specific use case. I have a history of records stored as
> >> > Parquet in S3. I would like to read and process them with Flink. The
> >> issue
> >> > is that the number of files is quite large ( >100k). If I provide the
> >> full
> >> > list of files to HadoopInputFormat that I am using it will fail with
> >> > AskTimeoutException, which Is weird since I am using YARN and setting
> >> the
> >> > -yD akka.ask.timeout=600s, even thought according to the logs the
> >> setting
> >> > is processed properly, the job execution still with
> AskTimeoutException
> >> > after 10s, which seems weird to me. I have managed to go around this,
> by
> >> > grouping files and reading them in a loop, so that finally I have the
> >> > Seq[DataSet<Record>]. But if I try to union those datasets, then I
> will
> >> > receive the AskTimeoutException again. So my question is, what can be
> >> the
> >> > reason behind this exception being thrown and why is the setting
> >> ignored,
> >> > even if this is pared properly.
> >> >
> >> > I will be glad for any help.
> >> >
> >> > Best Regards,
> >> > Dom.
> >> >
> >>
> >
>


-- 
Best, Jingsong Lee

Reply via email to