venkata91 commented on a change in pull request #30691: URL: https://github.com/apache/spark/pull/30691#discussion_r626224298
########## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ########## @@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(rprofsE === Set()) } + private def initPushBasedShuffleConfs(conf: SparkConf) = { + conf.set(config.SHUFFLE_SERVICE_ENABLED, true) + conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true) + conf.set("spark.master", "pushbasedshuffleclustermanager") + } + + test("SPARK-32920: shuffle merge finalization") { + initPushBasedShuffleConfs(conf) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 2 + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + completeShuffleMapStageSuccessfully(0, 0, parts) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == parts) + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(0 -> 42, 1 -> 42)) + results.clear() + assertDataStructuresEmpty() + } + + test("SPARK-32920: merger locations not empty") { + initPushBasedShuffleConfs(conf) + conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 2 + + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + completeShuffleMapStageSuccessfully(0, 0, parts) + val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty) + + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == parts) + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(0 -> 42, 1 -> 42)) + + results.clear() + assertDataStructuresEmpty() + } + + test("SPARK-32920: merger locations reuse from shuffle dependency") { + initPushBasedShuffleConfs(conf) + conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 2 + + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + + completeShuffleMapStageSuccessfully(0, 0, parts) + assert(shuffleDep.getMergerLocs.nonEmpty) + val mergerLocs = shuffleDep.getMergerLocs + completeNextResultStageWithSuccess(1, 0 ) + + // submit another job w/ the shared dependency, and have a fetch failure + val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduce2, Array(0, 1)) + // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped + // stage. If instead it reused the existing stage, then this would be stage 2 + completeNextStageWithFetchFailure(3, 0, shuffleDep) + scheduler.resubmitFailedStages() + + assert(scheduler.runningStages.nonEmpty) + assert(scheduler.stageIdToStage(2) + .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty) + val newMergerLocs = scheduler.stageIdToStage(2) + .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs + + // Check if same merger locs is reused for the new stage with shared shuffle dependency + assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host)) + completeShuffleMapStageSuccessfully(2, 0, 2) + completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) + assert(results === Map(0 -> 1234, 1 -> 1235)) + + assertDataStructuresEmpty() + } + + test("SPARK-32920: Disable shuffle merge due to not enough mergers available") { + initPushBasedShuffleConfs(conf) + conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 7 + + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + completeShuffleMapStageSuccessfully(0, 0, parts) + val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + assert(shuffleStage.shuffleDep.mergerLocs.isEmpty) + + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(2 -> 42, 5 -> 42, 4 -> 42, 1 -> 42, 3 -> 42, 6 -> 42, 0 -> 42)) + + results.clear() + assertDataStructuresEmpty() + } + + test("SPARK-32920: Ensure child stage should not start before all the" + + " parent stages are completed with shuffle merge finalized for all the parent stages") { Review comment: Checked `DAGSchedulerSuite` as well as `HealthTrackerSuite`, those seems to have 2 indents. Do you see 4 spaces anywhere else? ########## File path: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ########## @@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(rprofsE === Set()) } + private def initPushBasedShuffleConfs(conf: SparkConf) = { + conf.set(config.SHUFFLE_SERVICE_ENABLED, true) + conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true) + conf.set("spark.master", "pushbasedshuffleclustermanager") + } + + test("SPARK-32920: shuffle merge finalization") { + initPushBasedShuffleConfs(conf) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 2 + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + completeShuffleMapStageSuccessfully(0, 0, parts) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == parts) + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(0 -> 42, 1 -> 42)) + results.clear() + assertDataStructuresEmpty() + } + + test("SPARK-32920: merger locations not empty") { + initPushBasedShuffleConfs(conf) + conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 2 + + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + completeShuffleMapStageSuccessfully(0, 0, parts) + val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty) + + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == parts) + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(0 -> 42, 1 -> 42)) + + results.clear() + assertDataStructuresEmpty() + } + + test("SPARK-32920: merger locations reuse from shuffle dependency") { + initPushBasedShuffleConfs(conf) + conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 2 + + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0, 1)) + + completeShuffleMapStageSuccessfully(0, 0, parts) + assert(shuffleDep.getMergerLocs.nonEmpty) + val mergerLocs = shuffleDep.getMergerLocs + completeNextResultStageWithSuccess(1, 0 ) + + // submit another job w/ the shared dependency, and have a fetch failure + val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduce2, Array(0, 1)) + // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped + // stage. If instead it reused the existing stage, then this would be stage 2 + completeNextStageWithFetchFailure(3, 0, shuffleDep) + scheduler.resubmitFailedStages() + + assert(scheduler.runningStages.nonEmpty) + assert(scheduler.stageIdToStage(2) + .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty) + val newMergerLocs = scheduler.stageIdToStage(2) + .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs + + // Check if same merger locs is reused for the new stage with shared shuffle dependency + assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host)) + completeShuffleMapStageSuccessfully(2, 0, 2) + completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) + assert(results === Map(0 -> 1234, 1 -> 1235)) + + assertDataStructuresEmpty() + } + + test("SPARK-32920: Disable shuffle merge due to not enough mergers available") { + initPushBasedShuffleConfs(conf) + conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 7 + + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + completeShuffleMapStageSuccessfully(0, 0, parts) + val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + assert(shuffleStage.shuffleDep.mergerLocs.isEmpty) + + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(2 -> 42, 5 -> 42, 4 -> 42, 1 -> 42, 3 -> 42, 6 -> 42, 0 -> 42)) + + results.clear() + assertDataStructuresEmpty() + } + + test("SPARK-32920: Ensure child stage should not start before all the" + + " parent stages are completed with shuffle merge finalized for all the parent stages") { + initPushBasedShuffleConfs(conf) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 1 + val shuffleMapRdd1 = new MyRDD(sc, parts, Nil) + val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(parts)) + + val shuffleMapRdd2 = new MyRDD(sc, parts, Nil) + val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(parts)) + + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2), tracker = mapOutputTracker) + + // Submit a reduce job + submit(reduceRdd, (0 until parts).toArray) + + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + val shuffleStage1 = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + assert(shuffleStage1.shuffleDep.getMergerLocs.nonEmpty) + + complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) + val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage] + assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty) + + assert(shuffleStage2.isMergeFinalized) + assert(shuffleStage1.isMergeFinalized) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep1.shuffleId) == parts) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep2.shuffleId) == parts) + + completeNextResultStageWithSuccess(2, 0) + assert(results === Map(0 -> 42)) + results.clear() + assertDataStructuresEmpty() + } + + test("SPARK-32920: Reused ShuffleDependency with Shuffle Merge disabled for the corresponding" + + " ShuffleDependency should not cause DAGScheduler to hang") { + initPushBasedShuffleConfs(conf) + conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 10) + DAGSchedulerSuite.clearMergerLocs + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + val parts = 20 + + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + val partitions = (0 until parts).toArray + submit(reduceRdd, partitions) + + completeShuffleMapStageSuccessfully(0, 0, parts) + val shuffleStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + assert(shuffleStage.shuffleDep.mergerLocs.isEmpty) + + completeNextResultStageWithSuccess(1, 0) + val reduce2 = new MyRDD(sc, parts, List(shuffleDep)) + submit(reduce2, partitions) + // Stage 2 should not be executed as it should reuse the already computed shuffle output + assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics == null) + completeNextResultStageWithSuccess(3, 0, idx => idx + 1234) + + val expected = (0 until parts).map(idx => (idx, idx + 1234)) + assert(results === expected.toMap) + + assertDataStructuresEmpty() + } + + test("SPARK-32920: Reused ShuffleDependency with Shuffle Merge disabled for the corresponding" + + " ShuffleDependency with shuffle data loss should recompute missing partitions") { Review comment: Same as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org