Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job
can be found here if it would help in any way:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by
the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it
completes, the second job is submitted by the YARN client:

<snip>
02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
                - Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
                - TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
                - All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
                - Submitting job with JobID:
b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://flink@
<snip>.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works
for me), then the second job runs fine. However, if the input file for my
first job is large and the first job takes more than a minute or so to
complete, Flink will not acknowledge receiving the next job; the web Flink
console does not show any new jobs and Flink logs do not mention receiving
any new jobs after the first job has completed. The YARN client's job
submission times out after Flink does not respond:

Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s
(20 minutes), but even then Flink does not acknowledge or execute any other
jobs and there is the same timeout error. Does anyone know how I can get
Flink to execute all of the jobs properly?

Cheers,
Geoffrey Mon

Reply via email to