Repository: spark Updated Branches: refs/heads/branch-1.3 cc5f711c0 -> 265ec35bc
[SPARK-7563] (backport for 1.3) OutputCommitCoordinator.stop() should only run on the driver Backport of "[SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver" for 1.3 Author: Sean Owen <so...@cloudera.com> Closes #7865 from srowen/SPARK-7563-1.3 and squashes the following commits: f4479bc [Sean Owen] Backport of "[SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver" for 1.3 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/265ec35b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/265ec35b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/265ec35b Branch: refs/heads/branch-1.3 Commit: 265ec35bc8938939ac55d90b09e6a1a3773155eb Parents: cc5f711 Author: Sean Owen <so...@cloudera.com> Authored: Mon Aug 3 13:59:00 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Mon Aug 3 13:59:00 2015 +0100 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 10 ++++++---- .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/265ec35b/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d95a176..9c24afb 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -357,7 +357,7 @@ object SparkEnv extends Logging { } val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { - new OutputCommitCoordinator(conf) + new OutputCommitCoordinator(conf, isDriver) } val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator", new OutputCommitCoordinatorActor(outputCommitCoordinator)) http://git-wip-us.apache.org/repos/asf/spark/blob/265ec35b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index d89721c..4c70958 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -40,7 +40,7 @@ private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttem * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) * for an extensive design discussion. */ -private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { +private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging { // Initialized by SparkEnv var coordinatorActor: Option[ActorRef] = None @@ -134,9 +134,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { } def stop(): Unit = synchronized { - coordinatorActor.foreach(_ ! StopCoordinator) - coordinatorActor = None - authorizedCommittersByStage.clear() + if (isDriver) { + coordinatorActor.foreach(_ ! StopCoordinator) + coordinatorActor = None + authorizedCommittersByStage.clear() + } } // Marked private[scheduler] instead of private so this can be mocked in tests http://git-wip-us.apache.org/repos/asf/spark/blob/265ec35b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 52f126f..14ecf72 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { - outputCommitCoordinator = spy(new OutputCommitCoordinator(conf)) + outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true)) // Use Mockito.spy() to maintain the default infrastructure everywhere else. // This mocking allows us to control the coordinator responses in test cases. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org