Repository: spark Updated Branches: refs/heads/branch-2.2 a60000459 -> 72575d0bb
[SPARK-24552][CORE][BRANCH-2.2] Use unique id instead of attempt number for writes . This passes a unique attempt id to the Hadoop APIs, because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted. Author: Marcelo Vanzin <van...@cloudera.com> Closes #21616 from vanzin/SPARK-24552-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72575d0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72575d0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72575d0b Branch: refs/heads/branch-2.2 Commit: 72575d0bb575cb98798a3ffe08204e97971bfa19 Parents: a600004 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Mon Jun 25 16:56:12 2018 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Mon Jun 25 16:56:12 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/internal/io/SparkHadoopMapReduceWriter.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/72575d0b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 3b0a158..dd72f94 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -86,12 +86,16 @@ object SparkHadoopMapReduceWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers. + // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. + val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber + executeTask( context = context, jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, - sparkAttemptNumber = context.attemptNumber, + sparkAttemptNumber = attemptId, committer = committer, hadoopConf = conf.value, outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]], --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org