venkata91 commented on a change in pull request #33896:
URL: https://github.com/apache/spark/pull/33896#discussion_r749635349



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2023,71 +2070,140 @@ private[spark] class DAGScheduler(
   }
 
   /**
-   * Schedules shuffle merge finalize.
+   *
+   * Schedules shuffle merge finalization.
+   *
+   * @param stage the stage to finalize shuffle merge
+   * @param delay how long to wait before finalizing shuffle merge
+   * @param registerMergeResults whether to wait for merge results before 
scheduling the next stage
+   * @return whether the caller is able to schedule a finalize task
    */
-  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
-    // TODO: SPARK-33701: Instead of waiting for a constant amount of time for 
finalization
-    // TODO: for all the stages, adaptively tune timeout for merge finalization
-    logInfo(("%s (%s) scheduled for finalizing" +
-      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
-    shuffleMergeFinalizeScheduler.schedule(
-      new Runnable {
-        override def run(): Unit = finalizeShuffleMerge(stage)
-      },
-      shuffleMergeFinalizeWaitSec,
-      TimeUnit.SECONDS
-    )
+  private[scheduler] def scheduleShuffleMergeFinalize(
+      stage: ShuffleMapStage,
+      delay: Long,
+      registerMergeResults: Boolean = true): Boolean = {
+    val shuffleDep = stage.shuffleDep
+    val scheduledTask: Option[ScheduledFuture[_]] = shuffleDep.getFinalizeTask
+    scheduledTask match {
+      case Some(task) =>
+        // If we find an already scheduled task, check if the task has been 
triggered yet.
+        // If it's already triggered, do nothing. Otherwise, cancel it and 
schedule a new
+        // one for immediate execution. Note that we should get here only when
+        // handleShufflePushCompleted schedules a finalize task after the 
shuffle map stage
+        // completed earlier and scheduled a task with default delay.
+        if (task.getDelay(TimeUnit.NANOSECONDS) > 0) {
+          task.cancel(false)
+          // The current task should be coming from 
handleShufflePushCompleted, thus the
+          // delay should be 0 and registerMergeResults should be true.
+          assert(delay == 0 && registerMergeResults)

Review comment:
       Yeah since we are scheduling the finalize only if there is no scheduled 
task when the stage completes (one that checks if `totalShuffleSize < 
shuffleMergeWaitMinSizeThreshold`. We can move this assertion outside. Will 
make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to