mridulm commented on code in PR #37533: URL: https://github.com/apache/spark/pull/37533#discussion_r954454349
########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -2259,37 +2259,51 @@ private[spark] class DAGScheduler( } override def onShuffleMergeFailure(e: Throwable): Unit = { + if (e.isInstanceOf[IOException]) { + logInfo(s"Failed to connect external shuffle service " + + s"${shuffleServiceLoc.hostPort}") + blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host) + } } }) } } }, 0, TimeUnit.SECONDS) } else { - stage.shuffleDep.getMergerLocs.zipWithIndex.foreach { - case (shuffleServiceLoc, index) => - // Sends async request to shuffle service to finalize shuffle merge on that host - // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is cancelled - // TODO: during shuffleMergeFinalizeWaitSec - shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host, - shuffleServiceLoc.port, shuffleId, shuffleMergeId, - new MergeFinalizerListener { - override def onShuffleMergeSuccess(statuses: MergeStatuses): Unit = { - assert(shuffleId == statuses.shuffleId) - eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus. - convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc))) - results(index).set(true) - } + shuffleMergeFinalizeScheduler.schedule(new Runnable { Review Comment: Thoughts on pushing finalization send into the threadpool instead ? ``` val scheduledFutures = stage.shuffleDep.getMergerLocs.zipWithIndex.map { case (shuffleServiceLoc, index) => shuffleMergeFinalizeScheduler.schedule(new Runnable() { override def run(): Unit = { // existing code within the "case" } }, 0, TimeUnit.SECONDS)); }.toList ``` And if there is a `TimeoutException`, we cancel all the `scheduledFutures` (`scheduledFutures.map(_.cancel(true))`) We should also bump up the default number of threads in this threadpool. This will make sure that we wait utmost for `shuffleMergeResultsTimeoutSec` seconds for finalization to complete. In almost all cases, the `shuffleClient.finalizeShuffleMerge` will be really quick - and so the overhead is fairly low - but for the rare cases where it is not, we will only block that specific sending thread (while all other threads will send finalize message to other merger hosts), and we will always complete within the timeout (and release blocked threads) Thoughts ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org