otterc commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r881028865


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler(
               mapOutputTracker.
                 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
             }
+          } else {
+            // Unregister the merge result of <shuffleId, reduceId> if
+            // there is a FetchFailed event and is not a
+            // MetaDataFetchException which is signified by bmAddress being 
null

Review Comment:
   Nit: this can fit in 2 lines.



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2449,7 +2459,12 @@ private[spark] class DAGScheduler(
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     logDebug(s"Considering removal of executor $execId; " +
       s"fileLost: $fileLost, currentEpoch: $currentEpoch")
-    if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) 
< currentEpoch) {
+    // Check if the execId is a shuffle push merger
+    // We do not remove the executor if it is,
+    // and only remove the outputs on the host.

Review Comment:
   Nit: This can fit in 2 lines



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4342,6 +4342,95 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assertDataStructuresEmpty()
   }
 
+  test("SPARK-38987: corrupted shuffle block FetchFailure should unregister 
merge result") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+
+    scheduler = new MyDAGScheduler(
+      sc,
+      taskScheduler,
+      sc.listenerBus,
+      mapOutputTracker,
+      blockManagerMaster,
+      sc.env,
+      shuffleMergeFinalize = false,
+      shuffleMergeRegister = false)
+    dagEventProcessLoopTester = new 
DAGSchedulerEventProcessLoopTester(scheduler)
+
+    val parts = 2
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val shuffleMapStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+    scheduler.handleShuffleMergeFinalized(shuffleMapStage,
+      shuffleMapStage.shuffleDep.shuffleMergeId)
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== 1)
+
+    // Complete shuffle map stage with FetchFailed on hostA
+    complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map {
+      case (task, _) =>
+        (FetchFailed(
+          makeBlockManagerId("hostA"),
+          shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null)
+    }.toSeq)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== 0)
+  }
+
+  test("SPARK-38987: All shuffle outputs for a shuffle push" +

Review Comment:
   Nit could you modify the test name that this should happen when the config 
unRegisterOutputOnHost is true



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4342,6 +4342,95 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assertDataStructuresEmpty()
   }
 
+  test("SPARK-38987: corrupted shuffle block FetchFailure should unregister 
merge result") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+
+    scheduler = new MyDAGScheduler(
+      sc,
+      taskScheduler,
+      sc.listenerBus,
+      mapOutputTracker,
+      blockManagerMaster,
+      sc.env,
+      shuffleMergeFinalize = false,
+      shuffleMergeRegister = false)
+    dagEventProcessLoopTester = new 
DAGSchedulerEventProcessLoopTester(scheduler)
+
+    val parts = 2
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val shuffleMapStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+    scheduler.handleShuffleMergeFinalized(shuffleMapStage,
+      shuffleMapStage.shuffleDep.shuffleMergeId)
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId))))
+
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== 1)
+
+    // Complete shuffle map stage with FetchFailed on hostA
+    complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map {
+      case (task, _) =>
+        (FetchFailed(
+          makeBlockManagerId("hostA"),
+          shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null)
+    }.toSeq)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== 0)
+  }
+
+  test("SPARK-38987: All shuffle outputs for a shuffle push" +
+    " merger executor should be cleaned up on a fetch failure") {
+    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
+    conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
+
+    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    submit(reduceRdd, Array(0, 1, 2))
+    // Map stage completes successfully,
+    // two tasks are run on an executor on hostA and one on an executor on 
hostB
+    completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", 
"hostB"))
+    // Now the executor on hostA is lost
+    runEvent(ExecutorLost(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
+      ExecutorExited(-100, false, "Container marked as failed")))
+
+    // Shuffle push merger executor should not be removed and the shuffle 
files are not unregistered
+    verify(blockManagerMaster, 
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(mapOutputTracker,
+      
times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+
+    // Now a fetch failure from the lost executor occurs
+    complete(taskSets(1), Seq(
+      (FetchFailed(BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, 
"hostA", 12345),
+        shuffleId, 0L, 0, 0, "ignored"), null)
+    ))
+
+    // Verify that we are not removing the executor,
+    // and that we are only removing the outputs on the host
+    verify(blockManagerMaster, 
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(mapOutputTracker,
+      times(1)).removeOutputsOnHost("hostA")
+
+    // Shuffle files for shuffle-push-merger executor should be lost

Review Comment:
   What do you mean by this? There isn't any executor `shuffle-push-merger`. We 
should just assert here that all the mapStatus and mergeStatus on the host are 
removed



##########
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala:
##########
@@ -1786,4 +1786,32 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
   }
 
+  test("SPARK-38987: failure to fetch corrupted shuffle block chunk should " +
+    "throw a FetchFailedException when corruption detection is turned off") {

Review Comment:
   Nit: change this to `when early detection is unable to catch corruption` 
because it may be possible that the block is corrupt but the first 
maxBytesInFlight/3 are not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to