Repository: spark
Updated Branches:
  refs/heads/master b9b6fbe89 -> 0a5aef753


[SPARK-10666][SPARK-6880][CORE] Use properties from ActiveJob associated with a 
Stage

This issue was addressed in https://github.com/apache/spark/pull/5494, but the 
fix in that PR, while safe in the sense that it will prevent the SparkContext 
from shutting down, misses the actual bug.  The intent of `submitMissingTasks` 
should be understood as "submit the Tasks that are missing for the Stage, and 
run them as part of the ActiveJob identified by jobId".  Because of a 
long-standing bug, the `jobId` parameter was never being used.  Instead, we 
were trying to use the jobId with which the Stage was created -- which may no 
longer exist as an ActiveJob, hence the crash reported in SPARK-6880.

The correct fix is to use the ActiveJob specified by the supplied jobId 
parameter, which is guaranteed to exist at the call sites of submitMissingTasks.

This fix should be applied to all maintenance branches, since it has existed 
since 1.0.

kayousterhout pankajarora12

Author: Mark Hamstra <markhams...@gmail.com>
Author: Imran Rashid <iras...@cloudera.com>

Closes #6291 from markhamstra/SPARK-6880.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a5aef75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a5aef75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a5aef75

Branch: refs/heads/master
Commit: 0a5aef753e70e93d7e56054f354a52e4d4e18932
Parents: b9b6fbe
Author: Mark Hamstra <markhams...@gmail.com>
Authored: Wed Nov 25 09:34:34 2015 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Wed Nov 25 09:34:34 2015 -0600

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |   6 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 107 ++++++++++++++++++-
 2 files changed, 109 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a5aef75/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 77a184d..e01a960 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -946,7 +946,9 @@ class DAGScheduler(
       stage.resetInternalAccumulators()
     }
 
-    val properties = 
jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
+    // Use the scheduling pool, job group, description, etc. from an ActiveJob 
associated
+    // with this Stage
+    val properties = jobIdToActiveJob(jobId).properties
 
     runningStages += stage
     // SparkListenerStageSubmitted should be posted before testing whether 
tasks are
@@ -1047,7 +1049,7 @@ class DAGScheduler(
       stage.pendingPartitions ++= tasks.map(_.partitionId)
       logDebug("New pending partitions: " + stage.pendingPartitions)
       taskScheduler.submitTasks(new TaskSet(
-        tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, 
properties))
+        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should mark

http://git-wip-us.apache.org/repos/asf/spark/blob/0a5aef75/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4d6b254..653d41f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.Properties
+
 import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
@@ -262,9 +264,10 @@ class DAGSchedulerSuite
       rdd: RDD[_],
       partitions: Array[Int],
       func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
-      listener: JobListener = jobListener): Int = {
+      listener: JobListener = jobListener,
+      properties: Properties = null): Int = {
     val jobId = scheduler.nextJobId.getAndIncrement()
-    runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener))
+    runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), 
listener, properties))
     jobId
   }
 
@@ -1322,6 +1325,106 @@ class DAGSchedulerSuite
     assertDataStructuresEmpty()
   }
 
+  def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, 
priority: Int): Unit = {
+    assert(taskSet.properties != null)
+    assert(taskSet.properties.getProperty("testProperty") === expected)
+    assert(taskSet.priority === priority)
+  }
+
+  def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, 
Nothing] = {
+    val baseRdd = new MyRDD(sc, 1, Nil)
+    val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
+    val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
+    val shuffleDep2 = new ShuffleDependency(intermediateRdd, new 
HashPartitioner(1))
+    val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
+    val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
+    val job1Properties = new Properties()
+    val job2Properties = new Properties()
+    job1Properties.setProperty("testProperty", "job1")
+    job2Properties.setProperty("testProperty", "job2")
+
+    // Run jobs 1 & 2, both referencing the same stage, then cancel job1.
+    // Note that we have to submit job2 before we cancel job1 to have them 
actually share
+    // *Stages*, and not just shuffle dependencies, due to skipped stages (at 
least until
+    // we address SPARK-10193.)
+    val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
+    val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
+    assert(scheduler.activeJobs.nonEmpty)
+    val testProperty1 = 
scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
+
+    // remove job1 as an ActiveJob
+    cancel(jobId1)
+
+    // job2 should still be running
+    assert(scheduler.activeJobs.nonEmpty)
+    val testProperty2 = 
scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
+    assert(testProperty1 != testProperty2)
+    // NB: This next assert isn't necessarily the "desired" behavior; it's 
just to document
+    // the current behavior.  We've already submitted the TaskSet for stage 0 
based on job1, but
+    // even though we have cancelled that job and are now running it because 
of job2, we haven't
+    // updated the TaskSet's properties.  Changing the properties to "job2" is 
likely the more
+    // correct behavior.
+    val job1Id = 0  // TaskSet priority for Stages run with "job1" as the 
ActiveJob
+    checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
+    complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+
+    shuffleDep1
+  }
+
+  /**
+   * Makes sure that tasks for a stage used by multiple jobs are submitted 
with the properties of a
+   * later, active job if they were previously run under a job that is no 
longer active
+   */
+  test("stage used by two jobs, the first no longer active (SPARK-6880)") {
+    launchJobsThatShareStageAndCancelFirst()
+
+    // The next check is the key for SPARK-6880.  For the stage which was 
shared by both job1 and
+    // job2 but never had any tasks submitted for job1, the properties of job2 
are now used to run
+    // the stage.
+    checkJobPropertiesAndPriority(taskSets(1), "job2", 1)
+
+    complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+    assert(taskSets(2).properties != null)
+    complete(taskSets(2), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assert(scheduler.activeJobs.isEmpty)
+
+    assertDataStructuresEmpty()
+  }
+
+  /**
+   * Makes sure that tasks for a stage used by multiple jobs are submitted 
with the properties of a
+   * later, active job if they were previously run under a job that is no 
longer active, even when
+   * there are fetch failures
+   */
+  test("stage used by two jobs, some fetch failures, and the first job no 
longer active " +
+    "(SPARK-6880)") {
+    val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
+    val job2Id = 1  // TaskSet priority for Stages run with "job2" as the 
ActiveJob
+
+    // lets say there is a fetch failure in this task set, which makes us go 
back and
+    // run stage 0, attempt 1
+    complete(taskSets(1), Seq(
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, 
"ignored"), null)))
+    scheduler.resubmitFailedStages()
+
+    // stage 0, attempt 1 should have the properties of job2
+    assert(taskSets(2).stageId === 0)
+    assert(taskSets(2).stageAttemptId === 1)
+    checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)
+
+    // run the rest of the stages normally, checking that they have the 
correct properties
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+    checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
+    complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
+    checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
+    complete(taskSets(4), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assert(scheduler.activeJobs.isEmpty)
+
+    assertDataStructuresEmpty()
+  }
+
   test("run trivial shuffle with out-of-band failure and retry") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to