Repository: spark
Updated Branches:
  refs/heads/branch-2.2 a60000459 -> 72575d0bb


[SPARK-24552][CORE][BRANCH-2.2] Use unique id instead of attempt number for 
writes .

This passes a unique attempt id to the Hadoop APIs, because attempt
number is reused when stages are retried. When attempt numbers are
reused, sources that track data by partition id and attempt number
may incorrectly clean up data because the same attempt number can
be both committed and aborted.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #21616 from vanzin/SPARK-24552-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72575d0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72575d0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72575d0b

Branch: refs/heads/branch-2.2
Commit: 72575d0bb575cb98798a3ffe08204e97971bfa19
Parents: a600004
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Mon Jun 25 16:56:12 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Jun 25 16:56:12 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/internal/io/SparkHadoopMapReduceWriter.scala  | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72575d0b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 3b0a158..dd72f94 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -86,12 +86,16 @@ object SparkHadoopMapReduceWriter extends Logging {
     // Try to write all RDD partitions as a Hadoop OutputFormat.
     try {
       val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: 
Iterator[(K, V)]) => {
+        // SPARK-24552: Generate a unique "attempt ID" based on the stage and 
task attempt numbers.
+        // Assumes that there won't be more than Short.MaxValue attempts, at 
least not concurrently.
+        val attemptId = (context.stageAttemptNumber << 16) | 
context.attemptNumber
+
         executeTask(
           context = context,
           jobTrackerId = jobTrackerId,
           commitJobId = commitJobId,
           sparkPartitionId = context.partitionId,
-          sparkAttemptNumber = context.attemptNumber,
+          sparkAttemptNumber = attemptId,
           committer = committer,
           hadoopConf = conf.value,
           outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to