Repository: spark
Updated Branches:
  refs/heads/branch-2.2 f3f8c8767 -> 5b6300007


[SPARK-22162][BRANCH-2.2] Executors and the driver should use consistent JobIDs 
in the RDD commit protocol

I have modified SparkHadoopMapReduceWriter so that executors and the driver 
always use consistent JobIds during the hadoop commit. Before SPARK-18191, 
spark always used the rddId, it just incorrectly named the variable stageId. 
After SPARK-18191, it used the rddId as the jobId on the driver's side, and the 
stageId as the jobId on the executors' side. With this change executors and the 
driver will consistently uses rddId as the jobId. Also with this change, during 
the hadoop commit protocol spark uses  actual stageId to check whether a stage 
can be committed unlike before that  it was using executors' jobId to do this 
check.
In addition to the existing unit tests, a test has been added to check whether 
executors and the driver are using the same JobId. The test failed before this 
change and passed after applying this fix.

Author: Reza Safi <rezas...@cloudera.com>

Closes #19886 from rezasafi/stagerdd22.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b630000
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b630000
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b630000

Branch: refs/heads/branch-2.2
Commit: 5b63000075b1a01e552a0243b77038409758f7f8
Parents: f3f8c87
Author: Reza Safi <rezas...@cloudera.com>
Authored: Tue Dec 5 09:16:22 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Dec 5 09:16:22 2017 -0800

----------------------------------------------------------------------
 .../io/SparkHadoopMapReduceWriter.scala         | 12 +++---
 .../spark/mapred/SparkHadoopMapRedUtil.scala    |  5 ++-
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 44 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b630000/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 376ff9b..3b0a158 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -60,13 +60,13 @@ object SparkHadoopMapReduceWriter extends Logging {
       hadoopConf: Configuration): Unit = {
     // Extract context and configuration from RDD.
     val sparkContext = rdd.context
-    val stageId = rdd.id
+    val commitJobId = rdd.id
     val sparkConf = rdd.conf
     val conf = new SerializableConfiguration(hadoopConf)
 
     // Set up a job.
     val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date())
-    val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 
0, 0)
+    val jobAttemptId = new TaskAttemptID(jobTrackerId, commitJobId, 
TaskType.MAP, 0, 0)
     val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId)
     val format = jobContext.getOutputFormatClass
 
@@ -78,7 +78,7 @@ object SparkHadoopMapReduceWriter extends Logging {
 
     val committer = FileCommitProtocol.instantiate(
       className = classOf[HadoopMapReduceCommitProtocol].getName,
-      jobId = stageId.toString,
+      jobId = commitJobId.toString,
       outputPath = 
conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
       isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
     committer.setupJob(jobContext)
@@ -89,7 +89,7 @@ object SparkHadoopMapReduceWriter extends Logging {
         executeTask(
           context = context,
           jobTrackerId = jobTrackerId,
-          sparkStageId = context.stageId,
+          commitJobId = commitJobId,
           sparkPartitionId = context.partitionId,
           sparkAttemptNumber = context.attemptNumber,
           committer = committer,
@@ -112,7 +112,7 @@ object SparkHadoopMapReduceWriter extends Logging {
   private def executeTask[K, V: ClassTag](
       context: TaskContext,
       jobTrackerId: String,
-      sparkStageId: Int,
+      commitJobId: Int,
       sparkPartitionId: Int,
       sparkAttemptNumber: Int,
       committer: FileCommitProtocol,
@@ -120,7 +120,7 @@ object SparkHadoopMapReduceWriter extends Logging {
       outputFormat: Class[_ <: OutputFormat[K, V]],
       iterator: Iterator[(K, V)]): TaskCommitMessage = {
     // Set up a task.
-    val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, 
TaskType.REDUCE,
+    val attemptId = new TaskAttemptID(jobTrackerId, commitJobId, 
TaskType.REDUCE,
       sparkPartitionId, sparkAttemptNumber)
     val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
     committer.setupTask(taskContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/5b630000/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 607283a..764735d 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
       if (shouldCoordinateWithDriver) {
         val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
         val taskAttemptNumber = TaskContext.get().attemptNumber()
-        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
+        val stageId = TaskContext.get().stageId()
+        val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, 
taskAttemptNumber)
 
         if (canCommit) {
           performCommit()
@@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
           logInfo(message)
           // We need to abort the task so that the driver can reschedule new 
attempts, if necessary
           committer.abortTask(mrTaskContext)
-          throw new CommitDeniedException(message, jobId, splitId, 
taskAttemptNumber)
+          throw new CommitDeniedException(message, stageId, splitId, 
taskAttemptNumber)
         }
       } else {
         // Speculation is disabled or a user has chosen to manually bypass the 
commit coordination

http://git-wip-us.apache.org/repos/asf/spark/blob/5b630000/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 2820c15..5d4d1ce 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext 
=> NewJobContext,
   OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
   RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
 import org.apache.hadoop.util.Progressable
+import org.scalatest.Assertions
 
 import org.apache.spark._
 import org.apache.spark.Partitioner
@@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
     pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
   }
 
+  test("The JobId on the driver and executors should be the same during the 
commit") {
+    // Create more than one rdd to mimic stageId not equal to rddId
+    val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2)
+      .map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }
+      .filter { p => p._1 > 0 }
+    pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
+    assert(JobID.jobid != -1)
+  }
+
   test("saveAsHadoopFile should respect configured output committers") {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
     val conf = new JobConf()
@@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
   }
 }
 
+class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
+  def setupJob(j: NewJobContext): Unit = {
+    JobID.jobid = j.getJobID().getId
+  }
+
+  def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
+
+  def setupTask(t: NewTaskAttempContext): Unit = {
+    val jobId = t.getTaskAttemptID().getJobID().getId
+    assert(jobId === JobID.jobid)
+  }
+
+  def commitTask(t: NewTaskAttempContext): Unit = {}
+
+  def abortTask(t: NewTaskAttempContext): Unit = {}
+}
+
+class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
+
+  def checkOutputSpecs(j: NewJobContext): Unit = {}
+
+  def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, 
Integer] = {
+    new NewFakeWriter()
+  }
+
+  def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
+    new YetAnotherFakeCommitter()
+  }
+}
+
+object JobID {
+  var jobid = -1
+}
+
 class ConfigTestFormat() extends NewFakeFormat() with Configurable {
 
   var setConfCalled = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to