Repository: spark
Updated Branches:
  refs/heads/master 3927bb9b4 -> f81401e1c


[SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD 
commit protocol

I have modified SparkHadoopWriter so that executors and the driver always use 
consistent JobIds during the hadoop commit. Before SPARK-18191, spark always 
used the rddId, it just incorrectly named the variable stageId. After 
SPARK-18191, it used the rddId as the jobId on the driver's side, and the 
stageId as the jobId on the executors' side. With this change executors and the 
driver will consistently uses rddId as the jobId. Also with this change, during 
the hadoop commit protocol spark uses  actual stageId to check whether a stage 
can be committed unlike before that  it was using executors' jobId to do this 
check.
In addition to the existing unit tests, a test has been added to check whether 
executors and the driver are using the same JobId. The test failed before this 
change and passed after applying this fix.

Author: Reza Safi <rezas...@cloudera.com>

Closes #19848 from rezasafi/stagerddsimple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f81401e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f81401e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f81401e1

Branch: refs/heads/master
Commit: f81401e1cb39f2d6049b79dc8d61305f3371276f
Parents: 3927bb9
Author: Reza Safi <rezas...@cloudera.com>
Authored: Mon Dec 4 09:23:48 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Dec 4 09:23:48 2017 -0800

----------------------------------------------------------------------
 .../spark/internal/io/SparkHadoopWriter.scala   | 12 +++---
 .../spark/mapred/SparkHadoopMapRedUtil.scala    |  5 ++-
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 44 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f81401e1/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index 949d8c6..abf3921 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging {
       config: HadoopWriteConfigUtil[K, V]): Unit = {
     // Extract context and configuration from RDD.
     val sparkContext = rdd.context
-    val stageId = rdd.id
+    val commitJobId = rdd.id
 
     // Set up a job.
     val jobTrackerId = createJobTrackerID(new Date())
-    val jobContext = config.createJobContext(jobTrackerId, stageId)
+    val jobContext = config.createJobContext(jobTrackerId, commitJobId)
     config.initOutputFormat(jobContext)
 
     // Assert the output format/key/value class is set in JobConf.
     config.assertConf(jobContext, rdd.conf)
 
-    val committer = config.createCommitter(stageId)
+    val committer = config.createCommitter(commitJobId)
     committer.setupJob(jobContext)
 
     // Try to write all RDD partitions as a Hadoop OutputFormat.
@@ -80,7 +80,7 @@ object SparkHadoopWriter extends Logging {
           context = context,
           config = config,
           jobTrackerId = jobTrackerId,
-          sparkStageId = context.stageId,
+          commitJobId = commitJobId,
           sparkPartitionId = context.partitionId,
           sparkAttemptNumber = context.attemptNumber,
           committer = committer,
@@ -102,14 +102,14 @@ object SparkHadoopWriter extends Logging {
       context: TaskContext,
       config: HadoopWriteConfigUtil[K, V],
       jobTrackerId: String,
-      sparkStageId: Int,
+      commitJobId: Int,
       sparkPartitionId: Int,
       sparkAttemptNumber: Int,
       committer: FileCommitProtocol,
       iterator: Iterator[(K, V)]): TaskCommitMessage = {
     // Set up a task.
     val taskContext = config.createTaskAttemptContext(
-      jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
+      jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
     committer.setupTask(taskContext)
 
     val (outputMetrics, callback) = initHadoopOutputMetrics(context)

http://git-wip-us.apache.org/repos/asf/spark/blob/f81401e1/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 607283a..764735d 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
       if (shouldCoordinateWithDriver) {
         val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
         val taskAttemptNumber = TaskContext.get().attemptNumber()
-        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+        val stageId = TaskContext.get().stageId()
+        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, 
taskAttemptNumber)
 
         if (canCommit) {
           performCommit()
@@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
           logInfo(message)
           // We need to abort the task so that the driver can reschedule new 
attempts, if necessary
           committer.abortTask(mrTaskContext)
-          throw new CommitDeniedException(message, jobId, splitId, 
taskAttemptNumber)
+          throw new CommitDeniedException(message, stageId, splitId, 
taskAttemptNumber)
         }
       } else {
         // Speculation is disabled or a user has chosen to manually bypass the 
commit coordination

http://git-wip-us.apache.org/repos/asf/spark/blob/f81401e1/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 0a248b6..65d3526 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext 
=> NewJobContext,
   OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
   RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
 import org.apache.hadoop.util.Progressable
+import org.scalatest.Assertions
 
 import org.apache.spark._
 import org.apache.spark.Partitioner
@@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
     pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
   }
 
+  test("The JobId on the driver and executors should be the same during the 
commit") {
+    // Create more than one rdd to mimic stageId not equal to rddId
+    val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2)
+      .map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }
+      .filter { p => p._1 > 0 }
+    pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
+    assert(JobID.jobid != -1)
+  }
+
   test("saveAsHadoopFile should respect configured output committers") {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
     val conf = new JobConf()
@@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
   }
 }
 
+class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
+  def setupJob(j: NewJobContext): Unit = {
+    JobID.jobid = j.getJobID().getId
+  }
+
+  def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
+
+  def setupTask(t: NewTaskAttempContext): Unit = {
+    val jobId = t.getTaskAttemptID().getJobID().getId
+    assert(jobId === JobID.jobid)
+  }
+
+  def commitTask(t: NewTaskAttempContext): Unit = {}
+
+  def abortTask(t: NewTaskAttempContext): Unit = {}
+}
+
+class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
+
+  def checkOutputSpecs(j: NewJobContext): Unit = {}
+
+  def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, 
Integer] = {
+    new NewFakeWriter()
+  }
+
+  def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
+    new YetAnotherFakeCommitter()
+  }
+}
+
+object JobID {
+  var jobid = -1
+}
+
 class ConfigTestFormat() extends NewFakeFormat() with Configurable {
 
   var setConfCalled = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to