Repository: spark Updated Branches: refs/heads/master 66c49ed60 -> 9b692bfdf
[SPARK-7826] [CORE] Suppress extra calling getCacheLocs. There are too many extra call method `getCacheLocs` for `DAGScheduler`, which includes Akka communication. To improve `DAGScheduler` performance, suppress extra calling the method. In my application with over 1200 stages, the execution time became 3.8 min from 8.5 min with my patch. Author: Takuya UESHIN <ues...@happy-camper.st> Closes #6352 from ueshin/issues/SPARK-7826 and squashes the following commits: 3d4d036 [Takuya UESHIN] Modify a test and the documentation. 10b1b22 [Takuya UESHIN] Simplify the unit test. d858b59 [Takuya UESHIN] Move the storageLevel check inside the if (!cacheLocs.contains(rdd.id)) block. 6f3125c [Takuya UESHIN] Fix scalastyle. b9c835c [Takuya UESHIN] Put the condition that checks if the RDD has uncached partition or not into variable for readability. f87f2ec [Takuya UESHIN] Get cached locations from block manager only if the storage level of the RDD is not StorageLevel.NONE. 8248386 [Takuya UESHIN] Revert "Suppress extra calling getCacheLocs." a4d944a [Takuya UESHIN] Add an unit test. 9a80fad [Takuya UESHIN] Suppress extra calling getCacheLocs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b692bfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b692bfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b692bfd Branch: refs/heads/master Commit: 9b692bfdfcc91b32498865d21138cf215a378665 Parents: 66c49ed Author: Takuya UESHIN <ues...@happy-camper.st> Authored: Thu May 28 19:05:12 2015 -0700 Committer: Kay Ousterhout <kayousterh...@gmail.com> Committed: Thu May 28 19:05:12 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 15 ++++++--- .../spark/scheduler/DAGSchedulerSuite.scala | 35 +++++++++++++++++--- 2 files changed, 42 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9b692bfd/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a083be2..a2299e9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -193,9 +193,15 @@ class DAGScheduler( def getCacheLocs(rdd: RDD[_]): Seq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times if (!cacheLocs.contains(rdd.id)) { - val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] - val locs: Seq[Seq[TaskLocation]] = blockManagerMaster.getLocations(blockIds).map { bms => - bms.map(bm => TaskLocation(bm.host, bm.executorId)) + // Note: if the storage level is NONE, we don't need to get locations from block manager. + val locs: Seq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) { + Seq.fill(rdd.partitions.size)(Nil) + } else { + val blockIds = + rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] + blockManagerMaster.getLocations(blockIds).map { bms => + bms.map(bm => TaskLocation(bm.host, bm.executorId)) + } } cacheLocs(rdd.id) = locs } @@ -382,7 +388,8 @@ class DAGScheduler( def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd - if (getCacheLocs(rdd).contains(Nil)) { + val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) + if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => http://git-wip-us.apache.org/repos/asf/spark/blob/9b692bfd/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6a8ae29..4664223 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -318,7 +318,7 @@ class DAGSchedulerSuite } test("cache location preferences w/ dependency") { - val baseRdd = new MyRDD(sc, 1, Nil) + val baseRdd = new MyRDD(sc, 1, Nil).cache() val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) cacheLocations(baseRdd.id -> 0) = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) @@ -331,7 +331,7 @@ class DAGSchedulerSuite } test("regression test for getCacheLocs") { - val rdd = new MyRDD(sc, 3, Nil) + val rdd = new MyRDD(sc, 3, Nil).cache() cacheLocations(rdd.id -> 0) = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) cacheLocations(rdd.id -> 1) = @@ -342,6 +342,33 @@ class DAGSchedulerSuite assert(locs === Seq(Seq("hostA", "hostB"), Seq("hostB", "hostC"), Seq("hostC", "hostD"))) } + /** + * This test ensures that if a particular RDD is cached, RDDs earlier in the dependency chain + * are not computed. It constructs the following chain of dependencies: + * +---+ shuffle +---+ +---+ +---+ + * | A |<--------| B |<---| C |<---| D | + * +---+ +---+ +---+ +---+ + * Here, B is derived from A by performing a shuffle, C has a one-to-one dependency on B, + * and D similarly has a one-to-one dependency on C. If none of the RDDs were cached, this + * set of RDDs would result in a two stage job: one ShuffleMapStage, and a ResultStage that + * reads the shuffled data from RDD A. This test ensures that if C is cached, the scheduler + * doesn't perform a shuffle, and instead computes the result using a single ResultStage + * that reads C's cached data. + */ + test("getMissingParentStages should consider all ancestor RDDs' cache statuses") { + val rddA = new MyRDD(sc, 1, Nil) + val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, null))) + val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache() + val rddD = new MyRDD(sc, 1, List(new OneToOneDependency(rddC))) + cacheLocations(rddC.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + submit(rddD, Array(0)) + assert(scheduler.runningStages.size === 1) + // Make sure that the scheduler is running the final result stage. + // Because C is cached, the shuffle map stage to compute A does not need to be run. + assert(scheduler.runningStages.head.isInstanceOf[ResultStage]) + } + test("avoid exponential blowup when getting preferred locs list") { // Build up a complex dependency graph with repeated zip operations, without preferred locations var rdd: RDD[_] = new MyRDD(sc, 1, Nil) @@ -678,9 +705,9 @@ class DAGSchedulerSuite } test("cached post-shuffle") { - val shuffleOneRdd = new MyRDD(sc, 2, Nil) + val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache() val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) - val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)) + val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache() val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo)) submit(finalRdd, Array(0)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org