[ 
https://issues.apache.org/jira/browse/FLINK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197066#comment-17197066
 ] 

Stephan Ewen commented on FLINK-19264:
--------------------------------------

Good point, the execution attempt ID needs to be unique. At the very least, it 
should include the jobId.

That should make it unique on all same setups. If you submit the identical job 
graph (same Job ID) twice you probably already clash on the JobManager.


> Identical jobs cannot be run concurrently
> -----------------------------------------
>
>                 Key: FLINK-19264
>                 URL: https://issues.apache.org/jira/browse/FLINK-19264
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.12.0
>            Reporter: Aljoscha Krettek
>            Priority: Blocker
>
> While working on FLINK-19123 I noticed that jobs often fail on 
> {{MiniCluster}} when multiple jobs are running at the same time. 
> I created a reproducer here: 
> https://github.com/aljoscha/flink/tree/flink-19123-fix-test-stream-env-alternative.
>  You can run {{MiniClusterConcurrencyITCase}} to see the problem in action. 
> Sometimes the test will succeed, sometimes it will fail with
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.flink.test.example.MiniClusterConcurrencyITCase.submitConcurrently(MiniClusterConcurrencyITCase.java:60)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>       at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:107)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:926)
>       at akka.dispatch.OnComplete.internal(Future.scala:264)
>       at akka.dispatch.OnComplete.internal(Future.scala:261)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>       at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>       at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>       at 
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>       at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>       at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>       at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>       at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:215)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:208)
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:202)
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:523)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:237)
>       at 
> org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentReconciler.reconcileExecutionDeployments(DefaultExecutionDeploymentReconciler.java:55)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1229)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1216)
>       at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.receiveHeartbeat(HeartbeatManagerImpl.java:199)
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.heartbeatFromTaskManager(JobMaster.java:674)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>       ... 4 more
> Caused by: org.apache.flink.util.FlinkException: Execution 
> cbc357ccb763df2852fee8c4fc7d55f2_0_0 is unexpectedly no longer running on 
> task executor 0a6edd8c-b1f4-48da-a228-c395b2aff92d.
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:241)
>       ... 31 more
> {code}
> If you reduce the number of submitted jobs the test will always pass.
> Am important piece of the puzzle seems to be this message that you will also 
> find in the log output:
> {code}
> 4594 [mini-cluster-io-thread-25] INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - JobManager for job 
> 580f78b9224c2079ebe0c2f6b08fb4ba with leader id 
> b0ca0700de4f8e8a2555bfe7596741ac lost leadership.
> {code}
> It seems that the JobMasters of the concurrent jobs somehow clash with each 
> other.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to