venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626224298



##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(rprofsE === Set())
   }
 
+  private def initPushBasedShuffleConfs(conf: SparkConf) = {
+    conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+    conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true)
+    conf.set("spark.master", "pushbasedshuffleclustermanager")
+  }
+
+  test("SPARK-32920: shuffle merge finalization") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations not empty") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty)
+
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations reuse from shuffle dependency") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+    submit(reduceRdd, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    assert(shuffleDep.getMergerLocs.nonEmpty)
+    val mergerLocs = shuffleDep.getMergerLocs
+    completeNextResultStageWithSuccess(1, 0 )
+
+    // submit another job w/ the shared dependency, and have a fetch failure
+    val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduce2, Array(0, 1))
+    // Note that the stage numbering here is only b/c the shared dependency 
produces a new, skipped
+    // stage.  If instead it reused the existing stage, then this would be 
stage 2
+    completeNextStageWithFetchFailure(3, 0, shuffleDep)
+    scheduler.resubmitFailedStages()
+
+    assert(scheduler.runningStages.nonEmpty)
+    assert(scheduler.stageIdToStage(2)
+      .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty)
+    val newMergerLocs = scheduler.stageIdToStage(2)
+      .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+
+    // Check if same merger locs is reused for the new stage with shared 
shuffle dependency
+    assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host))
+    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+    assert(results === Map(0 -> 1234, 1 -> 1235))
+
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Disable shuffle merge due to not enough mergers 
available") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(2 -> 42, 5 -> 42, 4 -> 42, 1 -> 42, 3 -> 42, 6 -> 
42, 0 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Ensure child stage should not start before all the" +
+    " parent stages are completed with shuffle merge finalized for all the 
parent stages") {

Review comment:
       Checked `DAGSchedulerSuite` as well as `HealthTrackerSuite`, those seems 
to have 2 indents. Do you see 4 spaces anywhere else?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##########
@@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(rprofsE === Set())
   }
 
+  private def initPushBasedShuffleConfs(conf: SparkConf) = {
+    conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+    conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true)
+    conf.set("spark.master", "pushbasedshuffleclustermanager")
+  }
+
+  test("SPARK-32920: shuffle merge finalization") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations not empty") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty)
+
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations reuse from shuffle dependency") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 2
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+    submit(reduceRdd, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    assert(shuffleDep.getMergerLocs.nonEmpty)
+    val mergerLocs = shuffleDep.getMergerLocs
+    completeNextResultStageWithSuccess(1, 0 )
+
+    // submit another job w/ the shared dependency, and have a fetch failure
+    val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduce2, Array(0, 1))
+    // Note that the stage numbering here is only b/c the shared dependency 
produces a new, skipped
+    // stage.  If instead it reused the existing stage, then this would be 
stage 2
+    completeNextStageWithFetchFailure(3, 0, shuffleDep)
+    scheduler.resubmitFailedStages()
+
+    assert(scheduler.runningStages.nonEmpty)
+    assert(scheduler.stageIdToStage(2)
+      .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty)
+    val newMergerLocs = scheduler.stageIdToStage(2)
+      .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+
+    // Check if same merger locs is reused for the new stage with shared 
shuffle dependency
+    assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host))
+    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+    assert(results === Map(0 -> 1234, 1 -> 1235))
+
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Disable shuffle merge due to not enough mergers 
available") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 7
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)
+
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(2 -> 42, 5 -> 42, 4 -> 42, 1 -> 42, 3 -> 42, 6 -> 
42, 0 -> 42))
+
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Ensure child stage should not start before all the" +
+    " parent stages are completed with shuffle merge finalized for all the 
parent stages") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 1
+    val shuffleMapRdd1 = new MyRDD(sc, parts, Nil)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(parts))
+
+    val shuffleMapRdd2 = new MyRDD(sc, parts, Nil)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new 
HashPartitioner(parts))
+
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep1, shuffleDep2), 
tracker = mapOutputTracker)
+
+    // Submit a reduce job
+    submit(reduceRdd, (0 until parts).toArray)
+
+    complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+    val shuffleStage1 = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.shuffleDep.getMergerLocs.nonEmpty)
+
+    complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+    val shuffleStage2 = 
scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)
+
+    assert(shuffleStage2.isMergeFinalized)
+    assert(shuffleStage1.isMergeFinalized)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep1.shuffleId) 
== parts)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep2.shuffleId) 
== parts)
+
+    completeNextResultStageWithSuccess(2, 0)
+    assert(results === Map(0 -> 42))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Reused ShuffleDependency with Shuffle Merge disabled for 
the corresponding" +
+    " ShuffleDependency should not cause DAGScheduler to hang") {
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 10)
+    DAGSchedulerSuite.clearMergerLocs
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+    val parts = 20
+
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+    val partitions = (0 until parts).toArray
+    submit(reduceRdd, partitions)
+
+    completeShuffleMapStageSuccessfully(0, 0, parts)
+    val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.shuffleDep.mergerLocs.isEmpty)
+
+    completeNextResultStageWithSuccess(1, 0)
+    val reduce2 = new MyRDD(sc, parts, List(shuffleDep))
+    submit(reduce2, partitions)
+    // Stage 2 should not be executed as it should reuse the already computed 
shuffle output
+    assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics == null)
+    completeNextResultStageWithSuccess(3, 0, idx => idx + 1234)
+
+    val expected = (0 until parts).map(idx => (idx, idx + 1234))
+    assert(results === expected.toMap)
+
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Reused ShuffleDependency with Shuffle Merge disabled for 
the corresponding" +
+    " ShuffleDependency with shuffle data loss should recompute missing 
partitions") {

Review comment:
       Same as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to