Repository: spark Updated Branches: refs/heads/branch-2.2 f3f8c8767 -> 5b6300007
[SPARK-22162][BRANCH-2.2] Executors and the driver should use consistent JobIDs in the RDD commit protocol I have modified SparkHadoopMapReduceWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses actual stageId to check whether a stage can be committed unlike before that it was using executors' jobId to do this check. In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix. Author: Reza Safi <rezas...@cloudera.com> Closes #19886 from rezasafi/stagerdd22. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b630000 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b630000 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b630000 Branch: refs/heads/branch-2.2 Commit: 5b63000075b1a01e552a0243b77038409758f7f8 Parents: f3f8c87 Author: Reza Safi <rezas...@cloudera.com> Authored: Tue Dec 5 09:16:22 2017 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Dec 5 09:16:22 2017 -0800 ---------------------------------------------------------------------- .../io/SparkHadoopMapReduceWriter.scala | 12 +++--- .../spark/mapred/SparkHadoopMapRedUtil.scala | 5 ++- .../spark/rdd/PairRDDFunctionsSuite.scala | 44 ++++++++++++++++++++ 3 files changed, 53 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5b630000/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 376ff9b..3b0a158 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -60,13 +60,13 @@ object SparkHadoopMapReduceWriter extends Logging { hadoopConf: Configuration): Unit = { // Extract context and configuration from RDD. val sparkContext = rdd.context - val stageId = rdd.id + val commitJobId = rdd.id val sparkConf = rdd.conf val conf = new SerializableConfiguration(hadoopConf) // Set up a job. val jobTrackerId = SparkHadoopWriterUtils.createJobTrackerID(new Date()) - val jobAttemptId = new TaskAttemptID(jobTrackerId, stageId, TaskType.MAP, 0, 0) + val jobAttemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.MAP, 0, 0) val jobContext = new TaskAttemptContextImpl(conf.value, jobAttemptId) val format = jobContext.getOutputFormatClass @@ -78,7 +78,7 @@ object SparkHadoopMapReduceWriter extends Logging { val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, - jobId = stageId.toString, + jobId = commitJobId.toString, outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) @@ -89,7 +89,7 @@ object SparkHadoopMapReduceWriter extends Logging { executeTask( context = context, jobTrackerId = jobTrackerId, - sparkStageId = context.stageId, + commitJobId = commitJobId, sparkPartitionId = context.partitionId, sparkAttemptNumber = context.attemptNumber, committer = committer, @@ -112,7 +112,7 @@ object SparkHadoopMapReduceWriter extends Logging { private def executeTask[K, V: ClassTag]( context: TaskContext, jobTrackerId: String, - sparkStageId: Int, + commitJobId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, @@ -120,7 +120,7 @@ object SparkHadoopMapReduceWriter extends Logging { outputFormat: Class[_ <: OutputFormat[K, V]], iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. - val attemptId = new TaskAttemptID(jobTrackerId, sparkStageId, TaskType.REDUCE, + val attemptId = new TaskAttemptID(jobTrackerId, commitJobId, TaskType.REDUCE, sparkPartitionId, sparkAttemptNumber) val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId) committer.setupTask(taskContext) http://git-wip-us.apache.org/repos/asf/spark/blob/5b630000/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 607283a..764735d 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() - val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) + val stageId = TaskContext.get().stageId() + val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) if (canCommit) { performCommit() @@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(message) // We need to abort the task so that the driver can reschedule new attempts, if necessary committer.abortTask(mrTaskContext) - throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber) + throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber) } } else { // Speculation is disabled or a user has chosen to manually bypass the commit coordination http://git-wip-us.apache.org/repos/asf/spark/blob/5b630000/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 2820c15..5d4d1ce 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext, OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext} import org.apache.hadoop.util.Progressable +import org.scalatest.Assertions import org.apache.spark._ import org.apache.spark.Partitioner @@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } + test("The JobId on the driver and executors should be the same during the commit") { + // Create more than one rdd to mimic stageId not equal to rddId + val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2) + .map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) } + .filter { p => p._1 > 0 } + pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored") + assert(JobID.jobid != -1) + } + test("saveAsHadoopFile should respect configured output committers") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) val conf = new JobConf() @@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat { } } +class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions { + def setupJob(j: NewJobContext): Unit = { + JobID.jobid = j.getJobID().getId + } + + def needsTaskCommit(t: NewTaskAttempContext): Boolean = false + + def setupTask(t: NewTaskAttempContext): Unit = { + val jobId = t.getTaskAttemptID().getJobID().getId + assert(jobId === JobID.jobid) + } + + def commitTask(t: NewTaskAttempContext): Unit = {} + + def abortTask(t: NewTaskAttempContext): Unit = {} +} + +class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() { + + def checkOutputSpecs(j: NewJobContext): Unit = {} + + def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { + new NewFakeWriter() + } + + def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = { + new YetAnotherFakeCommitter() + } +} + +object JobID { + var jobid = -1 +} + class ConfigTestFormat() extends NewFakeFormat() with Configurable { var setConfCalled = false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org