venkata91 commented on a change in pull request #33896: URL: https://github.com/apache/spark/pull/33896#discussion_r749635349
########## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ########## @@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler( } /** - * Schedules shuffle merge finalize. + * + * Schedules shuffle merge finalization. + * + * @param stage the stage to finalize shuffle merge + * @param delay how long to wait before finalizing shuffle merge + * @param registerMergeResults whether to wait for merge results before scheduling the next stage + * @return whether the caller is able to schedule a finalize task */ - private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { - // TODO: SPARK-33701: Instead of waiting for a constant amount of time for finalization - // TODO: for all the stages, adaptively tune timeout for merge finalization - logInfo(("%s (%s) scheduled for finalizing" + - " shuffle merge in %s s").format(stage, stage.name, shuffleMergeFinalizeWaitSec)) - shuffleMergeFinalizeScheduler.schedule( - new Runnable { - override def run(): Unit = finalizeShuffleMerge(stage) - }, - shuffleMergeFinalizeWaitSec, - TimeUnit.SECONDS - ) + private[scheduler] def scheduleShuffleMergeFinalize( + stage: ShuffleMapStage, + delay: Long, + registerMergeResults: Boolean = true): Boolean = { + val shuffleDep = stage.shuffleDep + val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask + scheduledTask match { + case Some(task) => + // If we find an already scheduled task, check if the task has been triggered yet. + // If it's already triggered, do nothing. Otherwise, cancel it and schedule a new + // one for immediate execution. Note that we should get here only when + // handleShufflePushCompleted schedules a finalize task after the shuffle map stage + // completed earlier and scheduled a task with default delay. + if (task.getDelay(TimeUnit.NANOSECONDS) > 0) { + task.cancel(false) + // The current task should be coming from handleShufflePushCompleted, thus the + // delay should be 0 and registerMergeResults should be true. + assert(delay == 0 && registerMergeResults) Review comment: Yeah since we are scheduling the finalize only if there is no scheduled task when the stage completes (one that checks if `totalShuffleSize < shuffleMergeWaitMinSizeThreshold`. We can move this assertion outside. Will make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org