otterc commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626226635



##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(rprofsE === Set())
   }
 
+  private def initPushBasedShuffleConfs(conf: SparkConf) = {
+    conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+    conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true)
+    conf.set("spark.master", "pushbasedshuffleclustermanager")
+  }
+
+  test("SPARK-32920: shuffle merge finalization") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations not empty") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty)
+
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations reuse from shuffle dependency") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+    submit(reduceRdd, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    assert(shuffleDep.getMergerLocs.nonEmpty)
+    val mergerLocs = shuffleDep.getMergerLocs
+    completeNextResultStageWithSuccess(1, 0 )
+
+    // submit another job w/ the shared dependency, and have a fetch failure
+    val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduce2, Array(0, 1))
+    // Note that the stage numbering here is only b/c the shared dependency 
produces a new, skipped
+    // stage.  If instead it reused the existing stage, then this would be 
stage 2
+    completeNextStageWithFetchFailure(3, 0, shuffleDep)
+    scheduler.resubmitFailedStages()
+
+    assert(scheduler.runningStages.nonEmpty)
+    assert(scheduler.stageIdToStage(2)
+      .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty)
+    val newMergerLocs = scheduler.stageIdToStage(2)
+      .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+
+    // Check if same merger locs is reused for the new stage with shared 
shuffle dependency
+    assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host))
+    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+    assert(results === Map(0 -> 1234, 1 -> 1235))
+
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Disable shuffle merge due to not enough mergers 
available") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(2 -> 42, 5 -> 42, 4 -> 42, 1 -> 42, 3 -> 42, 6 -> 
42, 0 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Ensure child stage should not start before all the" +
+    " parent stages are completed with shuffle merge finalized for all the 
parent stages") {

Review comment:
       There are tests in `DAGSchedulerSuite` which have 4 spaces like:
   ```
     test("[SPARK-19263] DAGScheduler should not submit multiple active 
tasksets," +
         " even with late completions from earlier stage attempts")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to