Hello,
We're running 1.9.2 on YARN, and are seeing some interesting behavior when
submitting jobs in a multi-threaded fashion to an application's Flink cluster.
The error we see reported in the client application logs is the following:
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve
the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
... 3 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal
server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Job has already been
submitted.
Looking through the JobManager logs, I see this interesting sequence for JobID
8e1c2fdd68feee100d8fee003efef3e2:
2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91]
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan
20 14:01:42 EST 2021).
2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91]
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job
8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST
2021).
2021-01-20 14:07:25,843 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobMaster - Initializing
job Flink Java Job at Wed Jan 20 14:01:42 EST 2021
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:26,109 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobMaster - Using restart
strategy NoRestartStrategy for Flink Java Job at Wed Jan 20 14:01:42 EST 2021
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:28,705 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobMaster - Running
initialization on master for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-16]
org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager
runner for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021
(8e1c2fdd68feee100d8fee003efef3e2) was granted leadership with session id
00000000-0000-0000-0000-000000000000 at
akka.tcp://[email protected]:37966/user/jobmanager_124.
2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64]
org.apache.flink.runtime.jobmaster.JobMaster - Starting
execution of job Flink Java Job at Wed Jan 20 14:01:42 EST 2021
(8e1c2fdd68feee100d8fee003efef3e2) under job master id
00000000000000000000000000000000.
2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64]
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java
Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched
from state CREATED to RUNNING.
2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2]
org.apache.flink.yarn.YarnResourceManager - Registering job
manager
[email protected]://[email protected]:37966/user/jobmanager_124
for job 8e1c2fdd68feee100d8fee003efef3e2.
2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2]
org.apache.flink.yarn.YarnResourceManager - Registered job
manager
[email protected]://[email protected]:37966/user/jobmanager_124
for job
8e1c2fdd68feee100d8fee003efef3e2<mailto:[email protected]://[email protected]:37966/user/jobmanager_124%20for%20job%208e1c2fdd68feee100d8fee003efef3e2>.
2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-64]
org.apache.flink.yarn.YarnResourceManager - Request slot
with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1,
directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1,
managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with allocation
id 5bca3bde577f93169928e04749b45343.
2021-01-20 14:08:45,199 INFO [flink-akka.actor.default-dispatcher-30]
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan
20 14:01:42 EST 2021).
2021-01-20 14:09:19,981 INFO [flink-akka.actor.default-dispatcher-90]
org.apache.flink.runtime.jobmaster.JobMaster - Stopping the
JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST
2021(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:09:19,982 INFO [flink-akka.actor.default-dispatcher-90]
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java
Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched
from state RUNNING to FAILING.
It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the
cluster somehow received the submission request twice? The client log only show
a single submission for this job:
2021-01-20 14:01:55,775 [ProductHistory-18359] INFO RestClusterClient -
Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false).
So while the job is submitted a single time, the dispatcher somehow tries to
perform two submissions resulting in a failure. How does this happen?
____________
Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.
________________________________
Your Personal Data: We may collect and process information about you that may
be subject to data protection laws. For more information about how we use and
disclose your personal data, how we protect your information, our legal basis
to use your information, your rights and who you can contact, please refer to:
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>