Repository: spark
Updated Branches:
  refs/heads/master ad2e63662 -> 38e4699c9


[SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a 
barrier stage with unsupported RDD chain pattern

## What changes were proposed in this pull request?

Check on job submit to make sure we don't launch a barrier stage with 
unsupported RDD chain pattern. The following patterns are not supported:
- Ancestor RDDs that have different number of partitions from the resulting RDD 
(eg. union()/coalesce()/first()/PartitionPruningRDD);
- An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2)).

## How was this patch tested?

Add test cases in `BarrierStageOnSubmittedSuite`.

Author: Xingbo Jiang <xingbo.ji...@databricks.com>

Closes #21927 from jiangxb1987/SPARK-24820.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e4699c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e4699c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e4699c

Branch: refs/heads/master
Commit: 38e4699c978e56a0f24b8efb94fd3206cdd8b3fe
Parents: ad2e636
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Thu Aug 2 09:36:26 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Thu Aug 2 09:36:26 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  55 ++++++-
 .../spark/BarrierStageOnSubmittedSuite.scala    | 153 +++++++++++++++++++
 2 files changed, 207 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38e4699c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4858af7..3dd0718 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
+import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -341,6 +341,22 @@ class DAGScheduler(
   }
 
   /**
+   * Check to make sure we don't launch a barrier stage with unsupported RDD 
chain pattern. The
+   * following patterns are not supported:
+   * 1. Ancestor RDDs that have different number of partitions from the 
resulting RDD (eg.
+   * union()/coalesce()/first()/take()/PartitionPruningRDD);
+   * 2. An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2)).
+   */
+  private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], 
numTasksInStage: Int): Unit = {
+    val predicate: RDD[_] => Boolean = (r =>
+      r.getNumPartitions == numTasksInStage && 
r.dependencies.filter(_.rdd.isBarrier()).size <= 1)
+    if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) {
+      throw new SparkException(
+        
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+    }
+  }
+
+  /**
    * Creates a ShuffleMapStage that generates the given shuffle dependency's 
partitions. If a
    * previously run stage generated the same shuffle data, this function will 
copy the output
    * locations that are still available from the previous shuffle to avoid 
unnecessarily
@@ -348,6 +364,7 @@ class DAGScheduler(
    */
   def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: 
Int): ShuffleMapStage = {
     val rdd = shuffleDep.rdd
+    checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
     val numTasks = rdd.partitions.length
     val parents = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
@@ -376,6 +393,7 @@ class DAGScheduler(
       partitions: Array[Int],
       jobId: Int,
       callSite: CallSite): ResultStage = {
+    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
     val parents = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
     val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, 
callSite)
@@ -451,6 +469,32 @@ class DAGScheduler(
     parents
   }
 
+  /**
+   * Traverses the given RDD and its ancestors within the same stage and 
checks whether all of the
+   * RDDs satisfy a given predicate.
+   */
+  private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => 
Boolean): Boolean = {
+    val visited = new HashSet[RDD[_]]
+    val waitingForVisit = new ArrayStack[RDD[_]]
+    waitingForVisit.push(rdd)
+    while (waitingForVisit.nonEmpty) {
+      val toVisit = waitingForVisit.pop()
+      if (!visited(toVisit)) {
+        if (!predicate(toVisit)) {
+          return false
+        }
+        visited += toVisit
+        toVisit.dependencies.foreach {
+          case _: ShuffleDependency[_, _, _] =>
+            // Not within the same stage with current rdd, do nothing.
+          case dependency =>
+            waitingForVisit.push(dependency.rdd)
+        }
+      }
+    }
+    true
+  }
+
   private def getMissingParentStages(stage: Stage): List[Stage] = {
     val missing = new HashSet[Stage]
     val visited = new HashSet[RDD[_]]
@@ -1948,4 +1992,13 @@ private[spark] object DAGScheduler {
 
   // Number of consecutive stage attempts allowed before a stage is aborted
   val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
+
+  // Error message when running a barrier stage that have unsupported RDD 
chain pattern.
+  val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
+    "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the 
following pattern of " +
+      "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different 
number of " +
+      "partitions from the resulting RDD (eg. 
union()/coalesce()/first()/take()/" +
+      "PartitionPruningRDD). A workaround for first()/take() can be 
barrierRdd.collect().head " +
+      "(scala) or barrierRdd.collect()[0] (python).\n" +
+      "2. An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2))."
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/38e4699c/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala 
b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
new file mode 100644
index 0000000..f2b3884
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
+import org.apache.spark.scheduler.DAGScheduler
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * This test suite covers all the cases that shall fail fast on job submitted 
that contains one
+ * of more barrier stages.
+ */
+class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
BeforeAndAfterEach
+    with LocalSparkContext {
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+
+    val conf = new SparkConf()
+      .setMaster("local[4]")
+      .setAppName("test")
+    sc = new SparkContext(conf)
+  }
+
+  private def testSubmitJob(
+      sc: SparkContext,
+      rdd: RDD[Int],
+      partitions: Option[Seq[Int]] = None,
+      message: String): Unit = {
+    val futureAction = sc.submitJob(
+      rdd,
+      (iter: Iterator[Int]) => iter.toArray,
+      partitions.getOrElse(0 until rdd.partitions.length),
+      { case (_, _) => return }: (Int, Array[Int]) => Unit,
+      { return }
+    )
+
+    val error = intercept[SparkException] {
+      ThreadUtils.awaitResult(futureAction, 5 seconds)
+    }.getCause.getMessage
+    assert(error.contains(message))
+  }
+
+  test("submit a barrier ResultStage that contains PartitionPruningRDD") {
+    val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index 
=> index > 1)
+    val rdd = prunedRdd
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    testSubmitJob(sc, rdd,
+      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+  }
+
+  test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") {
+    val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index 
=> index > 1)
+    val rdd = prunedRdd
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+      .repartition(2)
+      .map(x => x + 1)
+    testSubmitJob(sc, rdd,
+      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+  }
+
+  test("submit a barrier stage that doesn't contain PartitionPruningRDD") {
+    val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index 
=> index > 1)
+    val rdd = prunedRdd
+      .repartition(2)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    // Should be able to submit job and run successfully.
+    val result = rdd.collect().sorted
+    assert(result === Seq(6, 7, 8, 9, 10))
+  }
+
+  test("submit a barrier stage with partial partitions") {
+    val rdd = sc.parallelize(1 to 10, 4)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    testSubmitJob(sc, rdd, Some(Seq(1, 3)),
+      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+  }
+
+  test("submit a barrier stage with union()") {
+    val rdd1 = sc.parallelize(1 to 10, 2)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    val rdd2 = sc.parallelize(1 to 20, 2)
+    val rdd3 = rdd1
+      .union(rdd2)
+      .map(x => x * 2)
+    // Fail the job on submit because the barrier RDD (rdd1) may be not 
assigned Task 0.
+    testSubmitJob(sc, rdd3,
+      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+  }
+
+  test("submit a barrier stage with coalesce()") {
+    val rdd = sc.parallelize(1 to 10, 4)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+      .coalesce(1)
+    // Fail the job on submit because the barrier RDD requires to run on 4 
tasks, but the stage
+    // only launches 1 task.
+    testSubmitJob(sc, rdd,
+      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+  }
+
+  test("submit a barrier stage that contains an RDD that depends on multiple 
barrier RDDs") {
+    val rdd1 = sc.parallelize(1 to 10, 4)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    val rdd2 = sc.parallelize(11 to 20, 4)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    val rdd3 = rdd1
+      .zip(rdd2)
+      .map(x => x._1 + x._2)
+    testSubmitJob(sc, rdd3,
+      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+  }
+
+  test("submit a barrier stage with zip()") {
+    val rdd1 = sc.parallelize(1 to 10, 4)
+      .barrier()
+      .mapPartitions((iter, context) => iter)
+    val rdd2 = sc.parallelize(11 to 20, 4)
+    val rdd3 = rdd1
+      .zip(rdd2)
+      .map(x => x._1 + x._2)
+    // Should be able to submit job and run successfully.
+    val result = rdd3.collect().sorted
+    assert(result === Seq(12, 14, 16, 18, 20, 22, 24, 26, 28, 30))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to