Repository: spark
Updated Branches:
  refs/heads/branch-1.3 cc5f711c0 -> 265ec35bc


[SPARK-7563] (backport for 1.3) OutputCommitCoordinator.stop() should only run 
on the driver

Backport of "[SPARK-7563] OutputCommitCoordinator.stop() should only run on the 
driver" for 1.3

Author: Sean Owen <so...@cloudera.com>

Closes #7865 from srowen/SPARK-7563-1.3 and squashes the following commits:

f4479bc [Sean Owen] Backport of "[SPARK-7563] OutputCommitCoordinator.stop() 
should only run on the driver" for 1.3


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/265ec35b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/265ec35b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/265ec35b

Branch: refs/heads/branch-1.3
Commit: 265ec35bc8938939ac55d90b09e6a1a3773155eb
Parents: cc5f711
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Aug 3 13:59:00 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Aug 3 13:59:00 2015 +0100

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala       |  2 +-
 .../apache/spark/scheduler/OutputCommitCoordinator.scala  | 10 ++++++----
 .../spark/scheduler/OutputCommitCoordinatorSuite.scala    |  2 +-
 3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/265ec35b/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d95a176..9c24afb 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -357,7 +357,7 @@ object SparkEnv extends Logging {
     }
 
     val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
-      new OutputCommitCoordinator(conf)
+      new OutputCommitCoordinator(conf, isDriver)
     }
     val outputCommitCoordinatorActor = 
registerOrLookup("OutputCommitCoordinator",
       new OutputCommitCoordinatorActor(outputCommitCoordinator))

http://git-wip-us.apache.org/repos/asf/spark/blob/265ec35b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index d89721c..4c70958 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -40,7 +40,7 @@ private case class AskPermissionToCommitOutput(stage: Int, 
task: Long, taskAttem
  * This class was introduced in SPARK-4879; see that JIRA issue (and the 
associated pull requests)
  * for an extensive design discussion.
  */
-private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
+private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: 
Boolean) extends Logging {
 
   // Initialized by SparkEnv
   var coordinatorActor: Option[ActorRef] = None
@@ -134,9 +134,11 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf) extends Logging {
   }
 
   def stop(): Unit = synchronized {
-    coordinatorActor.foreach(_ ! StopCoordinator)
-    coordinatorActor = None
-    authorizedCommittersByStage.clear()
+    if (isDriver) {
+      coordinatorActor.foreach(_ ! StopCoordinator)
+      coordinatorActor = None
+      authorizedCommittersByStage.clear()
+    }
   }
 
   // Marked private[scheduler] instead of private so this can be mocked in 
tests

http://git-wip-us.apache.org/repos/asf/spark/blob/265ec35b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 52f126f..14ecf72 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfter {
           conf: SparkConf,
           isLocal: Boolean,
           listenerBus: LiveListenerBus): SparkEnv = {
-        outputCommitCoordinator = spy(new OutputCommitCoordinator(conf))
+        outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, 
isDriver = true))
         // Use Mockito.spy() to maintain the default infrastructure everywhere 
else.
         // This mocking allows us to control the coordinator responses in test 
cases.
         SparkEnv.createDriverEnv(conf, isLocal, listenerBus, 
Some(outputCommitCoordinator))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to