Hi Robert, I appreciate you having a look. I’ll have a closer look and see what 
I can find. Thanks!


// ah

From: Robert Metzger <rmetz...@apache.org>
Sent: Friday, January 22, 2021 2:41 AM
To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com>
Cc: user@flink.apache.org
Subject: Re: org.apache.flink.runtime.client.JobSubmissionException: Job has 
already been submitted

Hey Andreas,
thanks a lot for providing me with the full logs.

The JobManager actually received 2 job submissions.
There are 2 relevant log messages.
1. "Received JobGraph submission xxx"
2. "Submitting job"
1 is logged right after the dispatcher receives the JobGraph, before the 
duplicate submission check is done. 2 is logged once we know that there is no 
duplicate job.

We have the following log messages (which you actually posted in here on the 
list already)

TYPE 1: 2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received 
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 
20 14:01:42 EST 2021).
TYPE 2: 2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 
8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 
2021).
TYPE 1: 2021-01-20 14:08:45,199 INFO  [flink-akka.actor.default-dispatcher-30] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received 
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 
20 14:01:42 EST 2021).

2021-01-20 14:09:19,981 INFO  [flink-akka.actor.default-dispatcher-90] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping the 
JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 
2021(8e1c2fdd68feee100d8fee003efef3e2).


So at 14:06, you are submitting the job, at 14:09 it fails.

In between (14:08) you are trying to submit the job again, which gets 
(rightfully) rejected. It seems that the second submission didn't get logged 
properly in your client.
I don't think there's a bug on Flink's side of things.


On Thu, Jan 21, 2021 at 7:17 PM Hailu, Andreas [Engineering] 
<andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote:
Hi Robert,

I sent you an email with instructions to create an account to view the logs 
through our secure repository. I’ve included the JobManager and client 
application logs there.

We have a thread pool that we use to submit multiple jobs in parallel, but in 
there there’s no retry logic – if any one job fails, it’s an overall failure 
for the entire application. In regards to the timespan between when the job was 
logged to have been initially submitted from the client app logs and when the 
JobManager logs it as being received – we’re submitting a large number of jobs 
as a part of this application. Is it possible that it’s busy processing other 
jobs?

// ah

From: Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>>
Sent: Thursday, January 21, 2021 10:00 AM
To: Hailu, Andreas [Engineering] 
<andreas.ha...@ny.email.gs.com<mailto:andreas.ha...@ny.email.gs.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: org.apache.flink.runtime.client.JobSubmissionException: Job has 
already been submitted

Thanks a lot for your message.

Why is there a difference of 5 minutes between the timestamp of the job 
submission from the client to the timestamp on the JobManager where the 
submission is received?
Is there any service / custom logic involved in the job submission? (e.g. a 
proxy in between, that has some retry mechanism, or some custom code that does 
retries?)

Could you provide the full JobManager logs of that timeframe, not just those 
messages filtered for 8e1c2fdd68feee100d8fee003efef3e2?

On Wed, Jan 20, 2021 at 10:20 PM Hailu, Andreas [Engineering] 
<andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote:
Hello,

We’re running 1.9.2 on YARN, and are seeing some interesting behavior when 
submitting jobs in a multi-threaded fashion to an application’s Flink cluster. 
The error we see reported in the client application logs is the following:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2)
           at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
           at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
           at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
           at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
           at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
           at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
           at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
           at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
           at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
           at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
           at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
           at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
           at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
           at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
           ... 3 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Job has already been 
submitted.

Looking through the JobManager logs, I see this interesting sequence for JobID 
8e1c2fdd68feee100d8fee003efef3e2:

2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received 
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 
20 14:01:42 EST 2021).
2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 
8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 
2021).
2021-01-20 14:07:25,843 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing 
job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:26,109 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart 
strategy NoRestartStrategy for Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:28,705 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Running 
initialization on master for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager 
runner for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2) was granted leadership with session id 
00000000-0000-0000-0000-000000000000 at 
akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124<http://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124>.
2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-64] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Starting 
execution of job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 
(8e1c2fdd68feee100d8fee003efef3e2) under job master id 
00000000000000000000000000000000.
2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-64] 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java 
Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched 
from state CREATED to RUNNING.
2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-2] 
org.apache.flink.yarn.YarnResourceManager                     - Registering job 
manager 
00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124<http://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124>
 for job 8e1c2fdd68feee100d8fee003efef3e2.
2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-2] 
org.apache.flink.yarn.YarnResourceManager                     - Registered job 
manager 
00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124
 for job 
8e1c2fdd68feee100d8fee003efef3e2<mailto:00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124%20for%20job%208e1c2fdd68feee100d8fee003efef3e2>.
2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-64] 
org.apache.flink.yarn.YarnResourceManager                     - Request slot 
with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1, 
managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with allocation 
id 5bca3bde577f93169928e04749b45343.
2021-01-20 14:08:45,199 INFO  [flink-akka.actor.default-dispatcher-30] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received 
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 
20 14:01:42 EST 2021).
2021-01-20 14:09:19,981 INFO  [flink-akka.actor.default-dispatcher-90] 
org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping the 
JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 
2021(8e1c2fdd68feee100d8fee003efef3e2).
2021-01-20 14:09:19,982 INFO  [flink-akka.actor.default-dispatcher-90] 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java 
Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) switched 
from state RUNNING to FAILING.

It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the 
cluster somehow received the submission request twice? The client log only show 
a single submission for this job:

2021-01-20 14:01:55,775 [ProductHistory-18359] INFO  RestClusterClient - 
Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false).

So while the job is submitted a single time, the dispatcher somehow tries to 
perform two submissions resulting in a failure. How does this happen?

____________

Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.


________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Reply via email to