otterc commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r881028865
########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { + // Unregister the merge result of <shuffleId, reduceId> if + // there is a FetchFailed event and is not a + // MetaDataFetchException which is signified by bmAddress being null Review Comment: Nit: this can fit in 2 lines. ########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -2449,7 +2459,12 @@ private[spark] class DAGScheduler( val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) logDebug(s"Considering removal of executor $execId; " + s"fileLost: $fileLost, currentEpoch: $currentEpoch") - if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) { + // Check if the execId is a shuffle push merger + // We do not remove the executor if it is, + // and only remove the outputs on the host. Review Comment: Nit: This can fit in 2 lines ########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -4342,6 +4342,95 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assertDataStructuresEmpty() } + test("SPARK-38987: corrupted shuffle block FetchFailure should unregister merge result") { + initPushBasedShuffleConfs(conf) + DAGSchedulerSuite.clearMergerLocs() + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + + scheduler = new MyDAGScheduler( + sc, + taskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env, + shuffleMergeFinalize = false, + shuffleMergeRegister = false) + dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) + + val parts = 2 + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + + val shuffleMapStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + scheduler.handleRegisterMergeStatuses(shuffleMapStage, + Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId)))) + scheduler.handleShuffleMergeFinalized(shuffleMapStage, + shuffleMapStage.shuffleDep.shuffleMergeId) + scheduler.handleRegisterMergeStatuses(shuffleMapStage, + Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId)))) + + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == 1) + + // Complete shuffle map stage with FetchFailed on hostA + complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map { + case (task, _) => + (FetchFailed( + makeBlockManagerId("hostA"), + shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null) + }.toSeq) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == 0) + } + + test("SPARK-38987: All shuffle outputs for a shuffle push" + Review Comment: Nit could you modify the test name that this should happen when the config unRegisterOutputOnHost is true ########## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ########## @@ -4342,6 +4342,95 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assertDataStructuresEmpty() } + test("SPARK-38987: corrupted shuffle block FetchFailure should unregister merge result") { + initPushBasedShuffleConfs(conf) + DAGSchedulerSuite.clearMergerLocs() + DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + + scheduler = new MyDAGScheduler( + sc, + taskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env, + shuffleMergeFinalize = false, + shuffleMergeRegister = false) + dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) + + val parts = 2 + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + + // Submit a reduce job that depends which will create a map stage + submit(reduceRdd, (0 until parts).toArray) + + val shuffleMapStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] + scheduler.handleRegisterMergeStatuses(shuffleMapStage, + Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId)))) + scheduler.handleShuffleMergeFinalized(shuffleMapStage, + shuffleMapStage.shuffleDep.shuffleMergeId) + scheduler.handleRegisterMergeStatuses(shuffleMapStage, + Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId)))) + + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == 1) + + // Complete shuffle map stage with FetchFailed on hostA + complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map { + case (task, _) => + (FetchFailed( + makeBlockManagerId("hostA"), + shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null) + }.toSeq) + assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) == 0) + } + + test("SPARK-38987: All shuffle outputs for a shuffle push" + + " merger executor should be cleaned up on a fetch failure") { + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") + + val shuffleMapRdd = new MyRDD(sc, 3, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker) + + submit(reduceRdd, Array(0, 1, 2)) + // Map stage completes successfully, + // two tasks are run on an executor on hostA and one on an executor on hostB + completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB")) + // Now the executor on hostA is lost + runEvent(ExecutorLost(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, + ExecutorExited(-100, false, "Container marked as failed"))) + + // Shuffle push merger executor should not be removed and the shuffle files are not unregistered + verify(blockManagerMaster, times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) + verify(mapOutputTracker, + times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) + + // Now a fetch failure from the lost executor occurs + complete(taskSets(1), Seq( + (FetchFailed(BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, "hostA", 12345), + shuffleId, 0L, 0, 0, "ignored"), null) + )) + + // Verify that we are not removing the executor, + // and that we are only removing the outputs on the host + verify(blockManagerMaster, times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) + verify(mapOutputTracker, + times(1)).removeOutputsOnHost("hostA") + + // Shuffle files for shuffle-push-merger executor should be lost Review Comment: What do you mean by this? There isn't any executor `shuffle-push-merger`. We should just assert here that all the mapStatus and mergeStatus on the host are removed ########## core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala: ########## @@ -1786,4 +1786,32 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2))) } + test("SPARK-38987: failure to fetch corrupted shuffle block chunk should " + + "throw a FetchFailedException when corruption detection is turned off") { Review Comment: Nit: change this to `when early detection is unable to catch corruption` because it may be possible that the block is corrupt but the first maxBytesInFlight/3 are not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org