Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ab1b8edb8 -> 277733b1d


[SPARK-6737] Fix memory leak in OutputCommitCoordinator

This patch fixes a memory leak in the DAGScheduler, which caused us to leak a 
map entry per submitted stage.  The problem is that the OutputCommitCoordinator 
needs to be informed when stages end in order to remove entries from its 
`authorizedCommitters` map, but the DAGScheduler only called it in one of the 
four code paths that are used to mark stages as completed.

This patch fixes this issue by consolidating the processing of stage completion 
into a new `markStageAsFinished` method and updates DAGSchedulerSuite's 
`assertDataStructuresEmpty` assertion to also check the OutputCommitCoordinator 
data structures.  I've also added a comment at the top of DAGScheduler so that 
we remember to update this test when adding new data structures.

Author: Josh Rosen <joshro...@databricks.com>

Closes #5397 from JoshRosen/SPARK-6737 and squashes the following commits:

af3b02f [Josh Rosen] Consolidate stage completion handling code in a single 
method.
e96ce3a [Josh Rosen] Consolidate stage completion handling code in a single 
method.
3052aea [Josh Rosen] Comment update
7896899 [Josh Rosen] Fix SPARK-6737 by informing OutputCommitCoordinator of all 
stage end events.
4ead1dc [Josh Rosen] Add regression tests for SPARK-6737

(cherry picked from commit c83e03948b184ffb3a9418fecc4d2c26ae33b057)
Signed-off-by: Josh Rosen <joshro...@databricks.com>

Conflicts:
        core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/277733b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/277733b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/277733b1

Branch: refs/heads/branch-1.3
Commit: 277733b1daa16fcd39e1ab30ba73b040affc3810
Parents: ab1b8ed
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Apr 7 16:18:55 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Apr 7 16:26:07 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 63 +++++++++++---------
 .../scheduler/OutputCommitCoordinator.scala     |  7 +++
 .../spark/scheduler/DAGSchedulerSuite.scala     |  1 +
 3 files changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/277733b1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c10873e..4a79eba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -54,6 +54,10 @@ import 
org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
  * not caused by shuffle file loss are handled by the TaskScheduler, which 
will retry each task
  * a small number of times before cancelling the whole stage.
  *
+ * Here's a checklist to use when making or reviewing changes to this class:
+ *
+ *  - When adding a new data structure, update 
`DAGSchedulerSuite.assertDataStructuresEmpty` to
+ *    include the new structure. This will help to catch memory leaks.
  */
 private[spark]
 class DAGScheduler(
@@ -115,6 +119,8 @@ class DAGScheduler(
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]
 
+  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
+
   // A closure serializer that we reuse.
   // This is only safe because DAGScheduler runs in a single thread.
   private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
@@ -132,8 +138,6 @@ class DAGScheduler(
   private[scheduler] val eventProcessLoop = new 
DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)
 
-  private val outputCommitCoordinator = env.outputCommitCoordinator
-
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -698,9 +702,10 @@ class DAGScheduler(
       // cancelling the stages because if the DAG scheduler is stopped, the 
entire application
       // is in the process of getting stopped.
       val stageFailedMessage = "Stage cancelled because SparkContext was shut 
down"
-      runningStages.foreach { stage =>
-        stage.latestInfo.stageFailed(stageFailedMessage)
-        listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+      // The `toArray` here is necessary so that we don't iterate over 
`runningStages` while
+      // mutating it.
+      runningStages.toArray.foreach { stage =>
+        markStageAsFinished(stage, Some(stageFailedMessage))
       }
       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobFailed(error)))
     }
@@ -868,13 +873,11 @@ class DAGScheduler(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), 
stage.jobId, properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     } else {
-      // Because we posted SparkListenerStageSubmitted earlier, we should post
-      // SparkListenerStageCompleted here in case there are no tasks to run.
-      outputCommitCoordinator.stageEnd(stage.id)
-      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+      // Because we posted SparkListenerStageSubmitted earlier, we should mark
+      // the stage as completed here in case there are no tasks to run
+      markStageAsFinished(stage, None)
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
-      runningStages -= stage
     }
   }
 
@@ -932,22 +935,6 @@ class DAGScheduler(
     }
 
     val stage = stageIdToStage(task.stageId)
-
-    def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) 
= {
-      val serviceTime = stage.latestInfo.submissionTime match {
-        case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
-        case _ => "Unknown"
-      }
-      if (errorMessage.isEmpty) {
-        logInfo("%s (%s) finished in %s s".format(stage, stage.name, 
serviceTime))
-        stage.latestInfo.completionTime = Some(clock.getTimeMillis())
-      } else {
-        stage.latestInfo.stageFailed(errorMessage.get)
-        logInfo("%s (%s) failed in %s s".format(stage, stage.name, 
serviceTime))
-      }
-      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
-      runningStages -= stage
-    }
     event.reason match {
       case Success =>
         listenerBus.post(SparkListenerTaskEnd(stageId, 
stage.latestInfo.attemptId, taskType,
@@ -1055,7 +1042,6 @@ class DAGScheduler(
           logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
             s"due to a fetch failure from $mapStage (${mapStage.name})")
           markStageAsFinished(failedStage, Some(failureMessage))
-          runningStages -= failedStage
         }
 
         if (disallowStageRetryForTest) {
@@ -1172,6 +1158,26 @@ class DAGScheduler(
   }
 
   /**
+   * Marks a stage as finished and removes it from the list of running stages.
+   */
+  private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = 
None): Unit = {
+    val serviceTime = stage.latestInfo.submissionTime match {
+      case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
+      case _ => "Unknown"
+    }
+    if (errorMessage.isEmpty) {
+      logInfo("%s (%s) finished in %s s".format(stage, stage.name, 
serviceTime))
+      stage.latestInfo.completionTime = Some(clock.getTimeMillis())
+    } else {
+      stage.latestInfo.stageFailed(errorMessage.get)
+      logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
+    }
+    outputCommitCoordinator.stageEnd(stage.id)
+    listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+    runningStages -= stage
+  }
+
+  /**
    * Aborts all jobs depending on a particular Stage. This is called in 
response to a task set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this 
event from outside.
    */
@@ -1222,8 +1228,7 @@ class DAGScheduler(
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-              stage.latestInfo.stageFailed(failureReason)
-              listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+              markStageAsFinished(stage, Some(failureReason))
             } catch {
               case e: UnsupportedOperationException =>
                 logInfo(s"Could not cancel tasks for stage $stageId", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/277733b1/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 25d90f0..d89721c 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -65,6 +65,13 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf) extends Logging {
   private type CommittersByStageMap = mutable.Map[StageId, 
mutable.Map[PartitionId, TaskAttemptId]]
 
   /**
+   * Returns whether the OutputCommitCoordinator's internal data structures 
are all empty.
+   */
+  def isEmpty: Boolean = {
+    authorizedCommittersByStage.isEmpty
+  }
+
+  /**
    * Called by tasks to ask whether they can commit their output to HDFS.
    *
    * If a task attempt has been authorized to commit, then all other attempts 
to commit the same

http://git-wip-us.apache.org/repos/asf/spark/blob/277733b1/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 9d0c127..febfe08 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -765,6 +765,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with 
BeforeAndAfter with LocalSpar
     assert(scheduler.runningStages.isEmpty)
     assert(scheduler.shuffleToMapStage.isEmpty)
     assert(scheduler.waitingStages.isEmpty)
+    assert(scheduler.outputCommitCoordinator.isEmpty)
   }
 
   // Nothing in this test should break if the task info's fields are null, but


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to