[ https://issues.apache.org/jira/browse/SPARK-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrew Or updated SPARK-7563: ----------------------------- Priority: Critical (was: Major) > OutputCommitCoordinator.stop() should only be executed in driver > ---------------------------------------------------------------- > > Key: SPARK-7563 > URL: https://issues.apache.org/jira/browse/SPARK-7563 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.3.1 > Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo) > Spark 1.3.1 Release > Reporter: Hailong Wen > Priority: Critical > > I am from IBM Platform Symphony team and we are integrating Spark 1.3.1 with > EGO (a resource management product). > In EGO we uses fine-grained dynamic allocation policy, and each Executor will > exit after its tasks are all done. When testing *spark-shell*, we find that > when executor of first job exit, it will stop OutputCommitCoordinator, which > result in all future jobs failing. Details are as follows: > We got the following error in executor when submitting job in *spark-shell* > the second time (the first job submission is successful): > {noformat} > 15/05/11 04:02:31 INFO spark.util.AkkaUtils: Connecting to > OutputCommitCoordinator: > akka.tcp://sparkDriver@whlspark01:50452/user/OutputCommitCoordinator > Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://sparkDriver@whlspark01:50452/), > Path(/user/OutputCommitCoordinator)] > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) > at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89) > at > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} > And in driver side, we see a log message telling that the > OutputCommitCoordinator is stopped after the first submission: > {noformat} > 15/05/11 04:01:23 INFO > spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: > OutputCommitCoordinator stopped! > {noformat} > We examine the code of OutputCommitCoordinator, and find that executor will > reuse the ref of driver's OutputCommitCoordinatorActor. So when an executor > exits, it will eventually call SparkEnv.stop(): > {noformat} > private[spark] def stop() { > isStopped = true > pythonWorkers.foreach { case(key, worker) => worker.stop() } > Option(httpFileServer).foreach(_.stop()) > mapOutputTracker.stop() > shuffleManager.stop() > broadcastManager.stop() > blockManager.stop() > blockManager.master.stop() > metricsSystem.stop() > outputCommitCoordinator.stop() <--------------- > actorSystem.shutdown() > ...... > {noformat} > and in OutputCommitCoordinator.stop(): > {noformat} > def stop(): Unit = synchronized { > coordinatorActor.foreach(_ ! StopCoordinator) > coordinatorActor = None > authorizedCommittersByStage.clear() > } > {noformat} > We now work this problem around by adding an attribute "isDriver" in > OutputCommitCoordinator and judge whether the "stop" command comes from > driver or executor: > {noformat} > diff SparkEnv.scala > 360c360 > < new OutputCommitCoordinator(conf, isDriver) > --- > > new OutputCommitCoordinator(conf) > diff OutputCommitCoordinator.scala > 43c43 > < private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: > Boolean = false) extends Logging { > --- > > private[spark] class OutputCommitCoordinator(conf: SparkConf) extends > > Logging { > 137,141c137,139 > < if (isDriver) { > < coordinatorActor.foreach(_ ! StopCoordinator) > < coordinatorActor = None > < authorizedCommittersByStage.clear() > < } > --- > > coordinatorActor.foreach(_ ! StopCoordinator) > > coordinatorActor = None > > authorizedCommittersByStage.clear() > {noformat} > We propose to apply this fix in future release since it may affects all > *spark-shell* function of dynamic allocation model. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org