Repository: spark
Updated Branches:
  refs/heads/branch-2.3 31dab7140 -> d22379ec2


[SPARK-23243][CORE][2.3] Fix RDD.repartition() data correctness issue

backport https://github.com/apache/spark/pull/22112 to 2.3

-------

An alternative fix for https://github.com/apache/spark/pull/21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, 
when a task is failed, just rerun it. Although the rerun task may return a 
different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this 
happens, Spark needs to rerun all the tasks of all the succeeding stages if the 
RDD output is indeterminate, because the input of the succeeding stages has 
been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the 
succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle 
partitioner is always deterministic(round-robin partitioner is not a shuffle 
partitioner that extends `org.apache.spark.Partitioner`), so the reducers will 
still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness 
issues.

For `repartition`, it applies a stateful map function to generate a round-robin 
id, which is order sensitive and makes the RDD's output indeterminate. When the 
stage contains `repartition` reruns, we must also rerun all the tasks of all 
the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We 
should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We 
should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the 
RDD's computing function.

a new test case

Closes #22354 from cloud-fan/repartition.

Authored-by: Wenchen Fan <wenc...@databricks.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d22379ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d22379ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d22379ec

Branch: refs/heads/branch-2.3
Commit: d22379ec2a01fb3aa2121c312a863f057a5761ed
Parents: 31dab71
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Sep 7 10:52:45 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Sep 7 10:52:45 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    |   3 +
 .../org/apache/spark/rdd/MapPartitionsRDD.scala |  21 ++-
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 100 ++++++++++-
 .../apache/spark/scheduler/DAGScheduler.scala   |  59 ++++++-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 169 ++++++++++++++++++-
 .../exchange/ShuffleExchangeExec.scala          |  17 +-
 6 files changed, 352 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d22379ec/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index c940cb2..5152375 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -33,6 +33,9 @@ import org.apache.spark.util.random.SamplingUtils
 /**
  * An object that defines how the elements in a key-value pair RDD are 
partitioned by key.
  * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
+ *
+ * Note that, partitioner must be deterministic, i.e. it must return the same 
partition id given
+ * the same partition key.
  */
 abstract class Partitioner extends Serializable {
   def numPartitions: Int

http://git-wip-us.apache.org/repos/asf/spark/blob/d22379ec/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index e4587c9..15128f0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -23,11 +23,22 @@ import org.apache.spark.{Partition, TaskContext}
 
 /**
  * An RDD that applies the provided function to every partition of the parent 
RDD.
+ *
+ * @param prev the parent RDD.
+ * @param f The function used to map a tuple of (TaskContext, partition index, 
input iterator) to
+ *          an output iterator.
+ * @param preservesPartitioning Whether the input function preserves the 
partitioner, which should
+ *                              be `false` unless `prev` is a pair RDD and the 
input function
+ *                              doesn't modify the keys.
+ * @param isOrderSensitive whether or not the function is order-sensitive. If 
it's order
+ *                         sensitive, it may return totally different result 
when the input order
+ *                         is changed. Mostly stateful functions are 
order-sensitive.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
     var prev: RDD[T],
     f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
-    preservesPartitioning: Boolean = false)
+    preservesPartitioning: Boolean = false,
+    isOrderSensitive: Boolean = false)
   extends RDD[U](prev) {
 
   override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
@@ -41,4 +52,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: 
ClassTag](
     super.clearDependencies()
     prev = null
   }
+
+  override protected def getOutputDeterministicLevel = {
+    if (isOrderSensitive && prev.outputDeterministicLevel == 
DeterministicLevel.UNORDERED) {
+      DeterministicLevel.INDETERMINATE
+    } else {
+      super.getOutputDeterministicLevel
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d22379ec/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0574abd..9d396ce 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -462,8 +462,9 @@ abstract class RDD[T: ClassTag](
 
       // include a shuffle step so that our upstream tasks are still 
distributed
       new CoalescedRDD(
-        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
-        new HashPartitioner(numPartitions)),
+        new ShuffledRDD[Int, T, T](
+          mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive 
= true),
+          new HashPartitioner(numPartitions)),
         numPartitions,
         partitionCoalescer).values
     } else {
@@ -807,16 +808,21 @@ abstract class RDD[T: ClassTag](
    * serializable and don't require closure cleaning.
    *
    * @param preservesPartitioning indicates whether the input function 
preserves the partitioner,
-   * which should be `false` unless this is a pair RDD and the input function 
doesn't modify
-   * the keys.
+   *                              which should be `false` unless this is a 
pair RDD and the input
+   *                              function doesn't modify the keys.
+   * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
+   *                         sensitive, it may return totally different result 
when the input order
+   *                         is changed. Mostly stateful functions are 
order-sensitive.
    */
   private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
       f: (Int, Iterator[T]) => Iterator[U],
-      preservesPartitioning: Boolean = false): RDD[U] = withScope {
+      preservesPartitioning: Boolean = false,
+      isOrderSensitive: Boolean = false): RDD[U] = withScope {
     new MapPartitionsRDD(
       this,
       (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
-      preservesPartitioning)
+      preservesPartitioning = preservesPartitioning,
+      isOrderSensitive = isOrderSensitive)
   }
 
   /**
@@ -1637,6 +1643,16 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Return whether this RDD is reliably checkpointed and materialized.
+   */
+  private[rdd] def isReliablyCheckpointed: Boolean = {
+    checkpointData match {
+      case Some(reliable: ReliableRDDCheckpointData[_]) if 
reliable.isCheckpointed => true
+      case _ => false
+    }
+  }
+
+  /**
    * Gets the name of the directory to which this RDD was checkpointed.
    * This is not defined if the RDD is checkpointed locally.
    */
@@ -1839,6 +1855,63 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
     new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Returns the deterministic level of this RDD's output. Please refer to 
[[DeterministicLevel]]
+   * for the definition.
+   *
+   * By default, an reliably checkpointed RDD, or RDD without parents(root 
RDD) is DETERMINATE. For
+   * RDDs with parents, we will generate a deterministic level candidate per 
parent according to
+   * the dependency. The deterministic level of the current RDD is the 
deterministic level
+   * candidate that is deterministic least. Please override 
[[getOutputDeterministicLevel]] to
+   * provide custom logic of calculating output deterministic level.
+   */
+  // TODO: make it public so users can set deterministic level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
deterministic level for
+  // different partitions.
+  private[spark] final lazy val outputDeterministicLevel: 
DeterministicLevel.Value = {
+    if (isReliablyCheckpointed) {
+      DeterministicLevel.DETERMINATE
+    } else {
+      getOutputDeterministicLevel
+    }
+  }
+
+  @DeveloperApi
+  protected def getOutputDeterministicLevel: DeterministicLevel.Value = {
+    val deterministicLevelCandidates = dependencies.map {
+      // The shuffle is not really happening, treat it like narrow dependency 
and assume the output
+      // deterministic level of current RDD is same as parent.
+      case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == 
dep.partitioner) =>
+        dep.rdd.outputDeterministicLevel
+
+      case dep: ShuffleDependency[_, _, _] =>
+        if (dep.rdd.outputDeterministicLevel == 
DeterministicLevel.INDETERMINATE) {
+          // If map output was indeterminate, shuffle output will be 
indeterminate as well
+          DeterministicLevel.INDETERMINATE
+        } else if (dep.keyOrdering.isDefined && dep.aggregator.isDefined) {
+          // if aggregator specified (and so unique keys) and key ordering 
specified - then
+          // consistent ordering.
+          DeterministicLevel.DETERMINATE
+        } else {
+          // In Spark, the reducer fetches multiple remote shuffle blocks at 
the same time, and
+          // the arrival order of these shuffle blocks are totally random. 
Even if the parent map
+          // RDD is DETERMINATE, the reduce RDD is always UNORDERED.
+          DeterministicLevel.UNORDERED
+        }
+
+      // For narrow dependency, assume the output deterministic level of 
current RDD is same as
+      // parent.
+      case dep => dep.rdd.outputDeterministicLevel
+    }
+
+    if (deterministicLevelCandidates.isEmpty) {
+      // By default we assume the root RDD is determinate.
+      DeterministicLevel.DETERMINATE
+    } else {
+      deterministicLevelCandidates.maxBy(_.id)
+    }
+  }
 }
 
 
@@ -1892,3 +1965,18 @@ object RDD {
     new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
   }
 }
+
+/**
+ * The deterministic level of RDD's output (i.e. what `RDD#compute` returns). 
This explains how
+ * the output will diff when Spark reruns the tasks for the RDD. There are 3 
deterministic levels:
+ * 1. DETERMINATE: The RDD output is always the same data set in the same 
order after a rerun.
+ * 2. UNORDERED: The RDD output is always the same data set but the order can 
be different
+ *               after a rerun.
+ * 3. INDETERMINATE. The RDD output can be different after a rerun.
+ *
+ * Note that, the output of an RDD usually relies on the parent RDDs. When the 
parent RDD's output
+ * is INDETERMINATE, it's very likely the RDD's output is also INDETERMINATE.
+ */
+private[spark] object DeterministicLevel extends Enumeration {
+  val DETERMINATE, UNORDERED, INDETERMINATE = Value
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d22379ec/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0df38f1..3a99d13 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
+import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -1352,6 +1352,63 @@ class DAGScheduler(
             failedStages += failedStage
             failedStages += mapStage
             if (noResubmitEnqueued) {
+              // If the map stage is INDETERMINATE, which means the map tasks 
may return
+              // different result when re-try, we need to re-try all the tasks 
of the failed
+              // stage and its succeeding stages, because the input data will 
be changed after the
+              // map tasks are re-tried.
+              // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+              // guaranteed to be determinate, so the input data of the 
reducers will not change
+              // even if the map tasks are re-tried.
+              if (mapStage.rdd.outputDeterministicLevel == 
DeterministicLevel.INDETERMINATE) {
+                // It's a little tricky to find all the succeeding stages of 
`failedStage`, because
+                // each stage only know its parents not children. Here we 
traverse the stages from
+                // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
+                // in the stage chains that connect to the `failedStage`. To 
speed up the stage
+                // traversing, we collect the stages to rollback first. If a 
stage needs to
+                // rollback, all its succeeding stages need to rollback to.
+                val stagesToRollback = 
scala.collection.mutable.HashSet(failedStage)
+
+                def collectStagesToRollback(stageChain: List[Stage]): Unit = {
+                  if (stagesToRollback.contains(stageChain.head)) {
+                    stageChain.drop(1).foreach(s => stagesToRollback += s)
+                  } else {
+                    stageChain.head.parents.foreach { s =>
+                      collectStagesToRollback(s :: stageChain)
+                    }
+                  }
+                }
+
+                def generateErrorMessage(stage: Stage): String = {
+                  "A shuffle map stage with indeterminate output was failed 
and retried. " +
+                    s"However, Spark cannot rollback the $stage to re-process 
the input data, " +
+                    "and has to fail this job. Please eliminate the 
indeterminacy by " +
+                    "checkpointing the RDD before repartition and try again."
+                }
+
+                activeJobs.foreach(job => 
collectStagesToRollback(job.finalStage :: Nil))
+
+                stagesToRollback.foreach {
+                  case mapStage: ShuffleMapStage =>
+                    val numMissingPartitions = 
mapStage.findMissingPartitions().length
+                    if (numMissingPartitions < mapStage.numTasks) {
+                      // TODO: support to rollback shuffle files.
+                      // Currently the shuffle writing is "first write wins", 
so we can't re-run a
+                      // shuffle map stage and overwrite existing shuffle 
files. We have to finish
+                      // SPARK-8029 first.
+                      abortStage(mapStage, generateErrorMessage(mapStage), 
None)
+                    }
+
+                  case resultStage: ResultStage if 
resultStage.activeJob.isDefined =>
+                    val numMissingPartitions = 
resultStage.findMissingPartitions().length
+                    if (numMissingPartitions < resultStage.numTasks) {
+                      // TODO: support to rollback result tasks.
+                      abortStage(resultStage, 
generateErrorMessage(resultStage), None)
+                    }
+
+                  case _ =>
+                }
+              }
+
               // We expect one executor failure to trigger many FetchFailures 
in rapid succession,
               // but all of those task failures can typically be handled by a 
single resubmission of
               // the failed stage.  We avoid flooding the scheduler's event 
queue with resubmit

http://git-wip-us.apache.org/repos/asf/spark/blob/d22379ec/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c07b4a9..efdd11e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{DeterministicLevel, RDD}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.shuffle.{FetchFailedException, 
MetadataFetchFailedException}
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
@@ -56,6 +56,20 @@ class DAGSchedulerEventProcessLoopTester(dagScheduler: 
DAGScheduler)
 
 }
 
+class MyCheckpointRDD(
+    sc: SparkContext,
+    numPartitions: Int,
+    dependencies: List[Dependency[_]],
+    locations: Seq[Seq[String]] = Nil,
+    @(transient @param) tracker: MapOutputTrackerMaster = null,
+    indeterminate: Boolean = false)
+  extends MyRDD(sc, numPartitions, dependencies, locations, tracker, 
indeterminate) {
+
+  // Allow doCheckpoint() on this RDD.
+  override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
Int)] =
+    Iterator.empty
+}
+
 /**
  * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and
  * preferredLocations (if any) that are passed to them. They are deliberately 
not executable
@@ -70,7 +84,8 @@ class MyRDD(
     numPartitions: Int,
     dependencies: List[Dependency[_]],
     locations: Seq[Seq[String]] = Nil,
-    @(transient @param) tracker: MapOutputTrackerMaster = null)
+    @(transient @param) tracker: MapOutputTrackerMaster = null,
+    indeterminate: Boolean = false)
   extends RDD[(Int, Int)](sc, dependencies) with Serializable {
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, 
Int)] =
@@ -80,6 +95,10 @@ class MyRDD(
     override def index: Int = i
   }).toArray
 
+  override protected def getOutputDeterministicLevel = {
+    if (indeterminate) DeterministicLevel.INDETERMINATE else 
super.getOutputDeterministicLevel
+  }
+
   override def getPreferredLocations(partition: Partition): Seq[String] = {
     if (locations.isDefinedAt(partition.index)) {
       locations(partition.index)
@@ -2457,6 +2476,152 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     }
   }
 
+  test("SPARK-23207: retry all the succeeding stages when the map stage is 
indeterminate") {
+    val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true)
+
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(2))
+    val shuffleId1 = shuffleDep1.shuffleId
+    val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = 
mapOutputTracker)
+
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new 
HashPartitioner(2))
+    val shuffleId2 = shuffleDep2.shuffleId
+    val finalRdd = new MyRDD(sc, 2, List(shuffleDep2), tracker = 
mapOutputTracker)
+
+    submit(finalRdd, Array(0, 1))
+
+    // Finish the first shuffle map stage.
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostB", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+
+    // Finish the second shuffle map stage.
+    complete(taskSets(1), Seq(
+      (Success, makeMapStatus("hostC", 2)),
+      (Success, makeMapStatus("hostD", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) === 
Some(Seq.empty))
+
+    // The first task of the final stage failed with fetch failure
+    runEvent(makeCompletionEvent(
+      taskSets(2).tasks(0),
+      FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
+      null))
+
+    val failedStages = scheduler.failedStages.toSeq
+    assert(failedStages.length == 2)
+    // Shuffle blocks of "hostC" is lost, so first task of the 
`shuffleMapRdd2` needs to retry.
+    assert(failedStages.collect {
+      case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 
=> stage
+    }.head.findMissingPartitions() == Seq(0))
+    // The result stage is still waiting for its 2 tasks to complete
+    assert(failedStages.collect {
+      case stage: ResultStage => stage
+    }.head.findMissingPartitions() == Seq(0, 1))
+
+    scheduler.resubmitFailedStages()
+
+    // The first task of the `shuffleMapRdd2` failed with fetch failure
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
+      null))
+
+    // The job should fail because Spark can't rollback the shuffle map stage.
+    assert(failure != null && failure.getMessage.contains("Spark cannot 
rollback"))
+  }
+
+  private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
+    val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
+    val shuffleId = shuffleDep.shuffleId
+    val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    submit(finalRdd, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(taskSets.length - 1, 0, 
numShufflePartitions = 2)
+    assert(mapOutputTracker.findMissingPartitions(shuffleId) === 
Some(Seq.empty))
+
+    // Finish the first task of the result stage
+    runEvent(makeCompletionEvent(
+      taskSets.last.tasks(0), Success, 42,
+      Seq.empty, createFakeTaskInfoWithId(0)))
+
+    // Fail the second task with FetchFailed.
+    runEvent(makeCompletionEvent(
+      taskSets.last.tasks(1),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
+      null))
+
+    // The job should fail because Spark can't rollback the result stage.
+    assert(failure != null && failure.getMessage.contains("Spark cannot 
rollback"))
+  }
+
+  test("SPARK-23207: cannot rollback a result stage") {
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil, indeterminate = true)
+    assertResultStageFailToRollback(shuffleMapRdd)
+  }
+
+  test("SPARK-23207: local checkpoint fail to rollback (checkpointed before)") 
{
+    val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+    shuffleMapRdd.localCheckpoint()
+    shuffleMapRdd.doCheckpoint()
+    assertResultStageFailToRollback(shuffleMapRdd)
+  }
+
+  test("SPARK-23207: local checkpoint fail to rollback (checkpointing now)") {
+    val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+    shuffleMapRdd.localCheckpoint()
+    assertResultStageFailToRollback(shuffleMapRdd)
+  }
+
+  private def assertResultStageNotRollbacked(mapRdd: MyRDD): Unit = {
+    val shuffleDep = new ShuffleDependency(mapRdd, new HashPartitioner(2))
+    val shuffleId = shuffleDep.shuffleId
+    val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    submit(finalRdd, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(taskSets.length - 1, 0, 
numShufflePartitions = 2)
+    assert(mapOutputTracker.findMissingPartitions(shuffleId) === 
Some(Seq.empty))
+
+    // Finish the first task of the result stage
+    runEvent(makeCompletionEvent(
+      taskSets.last.tasks(0), Success, 42,
+      Seq.empty, createFakeTaskInfoWithId(0)))
+
+    // Fail the second task with FetchFailed.
+    runEvent(makeCompletionEvent(
+      taskSets.last.tasks(1),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
+      null))
+
+    assert(failure == null, "job should not fail")
+    val failedStages = scheduler.failedStages.toSeq
+    assert(failedStages.length == 2)
+    // Shuffle blocks of "hostA" is lost, so first task of the 
`shuffleMapRdd2` needs to retry.
+    assert(failedStages.collect {
+      case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId 
=> stage
+    }.head.findMissingPartitions() == Seq(0))
+    // The first task of result stage remains completed.
+    assert(failedStages.collect {
+      case stage: ResultStage => stage
+    }.head.findMissingPartitions() == Seq(1))
+  }
+
+  test("SPARK-23207: reliable checkpoint can avoid rollback (checkpointed 
before)") {
+    sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+    val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+    shuffleMapRdd.checkpoint()
+    shuffleMapRdd.doCheckpoint()
+    assertResultStageNotRollbacked(shuffleMapRdd)
+  }
+
+  test("SPARK-23207: reliable checkpoint fail to rollback (checkpointing 
now)") {
+    sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+    val shuffleMapRdd = new MyCheckpointRDD(sc, 2, Nil, indeterminate = true)
+    shuffleMapRdd.checkpoint()
+    assertResultStageFailToRollback(shuffleMapRdd)
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.

http://git-wip-us.apache.org/repos/asf/spark/blob/d22379ec/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index b892037..aba9488 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -250,6 +250,9 @@ object ShuffleExchangeExec {
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")
     }
 
+    val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] &&
+      newPartitioning.numPartitions > 1
+
     val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
       // [SPARK-23207] Have to make sure the generated RoundRobinPartitioning 
is deterministic,
       // otherwise a retry task may output different rows and thus lead to 
data loss.
@@ -259,9 +262,7 @@ object ShuffleExchangeExec {
       //
       // Note that we don't perform local sort if the new partitioning has 
only 1 partition, under
       // that case all output rows go to the same partition.
-      val newRdd = if (SQLConf.get.sortBeforeRepartition &&
-          newPartitioning.numPartitions > 1 &&
-          newPartitioning.isInstanceOf[RoundRobinPartitioning]) {
+      val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) {
         rdd.mapPartitionsInternal { iter =>
           val recordComparatorSupplier = new Supplier[RecordComparator] {
             override def get: RecordComparator = new RecordBinaryComparator()
@@ -297,17 +298,19 @@ object ShuffleExchangeExec {
         rdd
       }
 
+      // round-robin function is order sensitive if we don't sort the input.
+      val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition
       if (needToCopyObjectsBeforeShuffle(part)) {
-        newRdd.mapPartitionsInternal { iter =>
+        newRdd.mapPartitionsWithIndexInternal((_, iter) => {
           val getPartitionKey = getPartitionKeyExtractor()
           iter.map { row => (part.getPartition(getPartitionKey(row)), 
row.copy()) }
-        }
+        }, isOrderSensitive = isOrderSensitive)
       } else {
-        newRdd.mapPartitionsInternal { iter =>
+        newRdd.mapPartitionsWithIndexInternal((_, iter) => {
           val getPartitionKey = getPartitionKeyExtractor()
           val mutablePair = new MutablePair[Int, InternalRow]()
           iter.map { row => 
mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
-        }
+        }, isOrderSensitive = isOrderSensitive)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to