Repository: spark Updated Branches: refs/heads/master ad2e63662 -> 38e4699c9
[SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a barrier stage with unsupported RDD chain pattern ## What changes were proposed in this pull request? Check on job submit to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The following patterns are not supported: - Ancestor RDDs that have different number of partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD); - An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). ## How was this patch tested? Add test cases in `BarrierStageOnSubmittedSuite`. Author: Xingbo Jiang <xingbo.ji...@databricks.com> Closes #21927 from jiangxb1987/SPARK-24820. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e4699c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e4699c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e4699c Branch: refs/heads/master Commit: 38e4699c978e56a0f24b8efb94fd3206cdd8b3fe Parents: ad2e636 Author: Xingbo Jiang <xingbo.ji...@databricks.com> Authored: Thu Aug 2 09:36:26 2018 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Thu Aug 2 09:36:26 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 55 ++++++- .../spark/BarrierStageOnSubmittedSuite.scala | 153 +++++++++++++++++++ 2 files changed, 207 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/38e4699c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4858af7..3dd0718 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.{RDD, RDDCheckpointData} +import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat @@ -341,6 +341,22 @@ class DAGScheduler( } /** + * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The + * following patterns are not supported: + * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. + * union()/coalesce()/first()/take()/PartitionPruningRDD); + * 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). + */ + private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], numTasksInStage: Int): Unit = { + val predicate: RDD[_] => Boolean = (r => + r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1) + if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) { + throw new SparkException( + DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + } + } + + /** * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a * previously run stage generated the same shuffle data, this function will copy the output * locations that are still available from the previous shuffle to avoid unnecessarily @@ -348,6 +364,7 @@ class DAGScheduler( */ def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd + checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() @@ -376,6 +393,7 @@ class DAGScheduler( partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { + checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) @@ -451,6 +469,32 @@ class DAGScheduler( parents } + /** + * Traverses the given RDD and its ancestors within the same stage and checks whether all of the + * RDDs satisfy a given predicate. + */ + private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = { + val visited = new HashSet[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] + waitingForVisit.push(rdd) + while (waitingForVisit.nonEmpty) { + val toVisit = waitingForVisit.pop() + if (!visited(toVisit)) { + if (!predicate(toVisit)) { + return false + } + visited += toVisit + toVisit.dependencies.foreach { + case _: ShuffleDependency[_, _, _] => + // Not within the same stage with current rdd, do nothing. + case dependency => + waitingForVisit.push(dependency.rdd) + } + } + } + true + } + private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] @@ -1948,4 +1992,13 @@ private[spark] object DAGScheduler { // Number of consecutive stage attempts allowed before a stage is aborted val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 + + // Error message when running a barrier stage that have unsupported RDD chain pattern. + val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = + "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + + "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + + "partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" + + "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + + "(scala) or barrierRdd.collect()[0] (python).\n" + + "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." } http://git-wip-us.apache.org/repos/asf/spark/blob/38e4699c/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala new file mode 100644 index 0000000..f2b3884 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.scheduler.DAGScheduler +import org.apache.spark.util.ThreadUtils + +/** + * This test suite covers all the cases that shall fail fast on job submitted that contains one + * of more barrier stages. + */ +class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach + with LocalSparkContext { + + override def beforeEach(): Unit = { + super.beforeEach() + + val conf = new SparkConf() + .setMaster("local[4]") + .setAppName("test") + sc = new SparkContext(conf) + } + + private def testSubmitJob( + sc: SparkContext, + rdd: RDD[Int], + partitions: Option[Seq[Int]] = None, + message: String): Unit = { + val futureAction = sc.submitJob( + rdd, + (iter: Iterator[Int]) => iter.toArray, + partitions.getOrElse(0 until rdd.partitions.length), + { case (_, _) => return }: (Int, Array[Int]) => Unit, + { return } + ) + + val error = intercept[SparkException] { + ThreadUtils.awaitResult(futureAction, 5 seconds) + }.getCause.getMessage + assert(error.contains(message)) + } + + test("submit a barrier ResultStage that contains PartitionPruningRDD") { + val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1) + val rdd = prunedRdd + .barrier() + .mapPartitions((iter, context) => iter) + testSubmitJob(sc, rdd, + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + } + + test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") { + val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1) + val rdd = prunedRdd + .barrier() + .mapPartitions((iter, context) => iter) + .repartition(2) + .map(x => x + 1) + testSubmitJob(sc, rdd, + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + } + + test("submit a barrier stage that doesn't contain PartitionPruningRDD") { + val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1) + val rdd = prunedRdd + .repartition(2) + .barrier() + .mapPartitions((iter, context) => iter) + // Should be able to submit job and run successfully. + val result = rdd.collect().sorted + assert(result === Seq(6, 7, 8, 9, 10)) + } + + test("submit a barrier stage with partial partitions") { + val rdd = sc.parallelize(1 to 10, 4) + .barrier() + .mapPartitions((iter, context) => iter) + testSubmitJob(sc, rdd, Some(Seq(1, 3)), + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + } + + test("submit a barrier stage with union()") { + val rdd1 = sc.parallelize(1 to 10, 2) + .barrier() + .mapPartitions((iter, context) => iter) + val rdd2 = sc.parallelize(1 to 20, 2) + val rdd3 = rdd1 + .union(rdd2) + .map(x => x * 2) + // Fail the job on submit because the barrier RDD (rdd1) may be not assigned Task 0. + testSubmitJob(sc, rdd3, + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + } + + test("submit a barrier stage with coalesce()") { + val rdd = sc.parallelize(1 to 10, 4) + .barrier() + .mapPartitions((iter, context) => iter) + .coalesce(1) + // Fail the job on submit because the barrier RDD requires to run on 4 tasks, but the stage + // only launches 1 task. + testSubmitJob(sc, rdd, + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + } + + test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") { + val rdd1 = sc.parallelize(1 to 10, 4) + .barrier() + .mapPartitions((iter, context) => iter) + val rdd2 = sc.parallelize(11 to 20, 4) + .barrier() + .mapPartitions((iter, context) => iter) + val rdd3 = rdd1 + .zip(rdd2) + .map(x => x._1 + x._2) + testSubmitJob(sc, rdd3, + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + } + + test("submit a barrier stage with zip()") { + val rdd1 = sc.parallelize(1 to 10, 4) + .barrier() + .mapPartitions((iter, context) => iter) + val rdd2 = sc.parallelize(11 to 20, 4) + val rdd3 = rdd1 + .zip(rdd2) + .map(x => x._1 + x._2) + // Should be able to submit job and run successfully. + val result = rdd3.collect().sorted + assert(result === Seq(12, 14, 16, 18, 20, 22, 24, 26, 28, 30)) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org