[ 
https://issues.apache.org/jira/browse/FLINK-11375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-11375:
------------------------------

    Assignee: BoWang

> Concurrent modification to slot pool due to SlotSharingManager releaseSlot 
> directly 
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-11375
>                 URL: https://issues.apache.org/jira/browse/FLINK-11375
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.7.1
>            Reporter: shuai.xu
>            Assignee: BoWang
>            Priority: Major
>             Fix For: 1.8.0
>
>
> In SlotPool, the AvailableSlots is lock free, so all access to it should in 
> the main thread of SlotPool, and so all the public methods are called through 
> SlotPoolGateway except the releaseSlot directly called by SlotSharingManager. 
> This may cause a ConcurrentModificationException.
>  2019-01-16 19:50:16,184 INFO [flink-akka.actor.default-dispatcher-161] 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
> BlinkStoreScanTableSource 
> feature_memory_entity_store-entity_lsc_page_detail_feats_group_178-Batch -> 
> SourceConversion(table:[_DataStreamTable_12, source: 
> [BlinkStoreScanTableSource 
> feature_memory_entity_store-entity_lsc_page_detail_feats_group_178]], 
> fields:(f0)) -> correlate: 
> table(ScanBlinkStore_entity_lsc_page_detail_feats_group_1786($cor6.f0)), 
> select: 
> item_id,mainse_searcher_rank__cart_uv,mainse_searcher_rank__cart_uv_14,mainse_searcher_rank__cart_uv_30,mainse_searcher_rank__cart_uv_7,mainse_s
>  (433/500) (bd34af8dd7ee02d04a4a25e698495f0a) switched from RUNNING to 
> FINISHED.
>  2019-01-16 19:50:16,187 INFO [jobmanager-future-thread-90] 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - scheduleVertices 
> meet exception, need to fail global execution graph
>  java.lang.reflect.UndeclaredThrowableException
>  at org.apache.flink.runtime.rpc.akka.$Proxy26.allocateSlots(Unknown Source)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.allocateSlots(SlotPool.java:1955)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.schedule(ExecutionGraph.java:965)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleVertices(ExecutionGraph.java:1503)
>  at 
> org.apache.flink.runtime.jobmaster.GraphManager$ExecutionGraphVertexScheduler.scheduleExecutionVertices(GraphManager.java:349)
>  at 
> org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.scheduleOneByOne(StepwiseSchedulingPlugin.java:132)
>  at 
> org.apache.flink.runtime.schedule.StepwiseSchedulingPlugin.onExecutionVertexFailover(StepwiseSchedulingPlugin.java:107)
>  at 
> org.apache.flink.runtime.jobmaster.GraphManager.notifyExecutionVertexFailover(GraphManager.java:163)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.resetExecutionVerticesAndNotify(ExecutionGraph.java:1372)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.restart(FailoverRegion.java:213)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.reset(FailoverRegion.java:198)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.allVerticesInTerminalState(FailoverRegion.java:97)
>  at 
> org.apache.flink.runtime.executiongraph.failover.FailoverRegion.lambda$cancel$0(FailoverRegion.java:169)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:186)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:299)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>  at java.lang.Thread.run(Thread.java:834)
>  Caused by: java.util.concurrent.ExecutionException: 
> java.util.ConcurrentModificationException
>  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:213)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:125)
>  ... 23 more
>  Caused by: java.util.ConcurrentModificationException
>  at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1643)
>  at 
> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
>  at 
> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485)
>  at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>  at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
>  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findPreviousAllocation(PreviousAllocationSchedulingStrategy.java:77)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy.findMatchWithLocality(PreviousAllocationSchedulingStrategy.java:61)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1755)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool$AvailableSlots.poll(SlotPool.java:1790)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.pollAndAllocateSlots(SlotPool.java:1094)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.requestAllocatedSlots(SlotPool.java:886)
>  at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool.allocateSlots(SlotPool.java:590)
>  at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>  at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>  at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to