[ https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann updated FLINK-11375: ---------------------------------- Component/s: Distributed Coordination > Concurrent modification to slot pool due to SlotSharingManager releaseSlot > directly > ------------------------------------------------------------------------------------ > > Key: FLINK-11375 > URL: https://issues.apache.org/jira/browse/FLINK-11375 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, JobManager > Affects Versions: 1.7.1 > Reporter: shuai.xu > Assignee: BoWang > Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In SlotPool, the AvailableSlots is lock free, so all access to it should in > the main thread of SlotPool, and so all the public methods are called through > SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. > This may cause a ConcurrentModificationException. > 2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > BlinkStoreScanTableSource > feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> > SourceConversion(table:[_DataStreamTable_12, source: > [BlinkStoreScanTableSource > feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], > fields:(f0)) -> correlate: > table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), > select: > item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s > (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to > FINISHED. > 2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] > org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices > meet exception, need to fail global execution graph > java.lang.reflect.UndeclaredThrowableException > at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503) > at > org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349) > at > org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132) > at > org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107) > at > org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97) > at > org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > at java.lang.Thread.run(Thread.java:834) > Caused by: java.util.concurrent.ExecutionException: > java.util.ConcurrentModificationException > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125) > ... 23 more > Caused by: java.util.ConcurrentModificationException > at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643) > at > java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) > at > java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) > at > org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findPreviousAllocation(PreviousAllocationSchedulingStrategy.java:77) > at > org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findMatchWithLocality(PreviousAllocationSchedulingStrategy.java:61) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1755) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1790) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.pollAndAllocateSlots(SlotPool.java:1094) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.requestAllocatedSlots(SlotPool.java:886) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPool.allocateSlots(SlotPool.java:590) > at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v7.6.3#76005)