mridulm commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r640258696



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
     }
   }
 
-  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
-    stage.shuffleDep.markShuffleMergeFinalized
-    processShuffleMapStageCompletion(stage)
+  private[scheduler] def handleRegisterMergeStatuses(
+      stage: ShuffleMapStage,
+      mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+    // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+    if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {
+      mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId, 
mergeStatuses)
+    }
+  }
+
+  private[scheduler] def handleShuffleMergeFinalized(
+      stage: ShuffleMapStage): Unit = {

Review comment:
       nit: combine it with prev line ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
     }
   }
 
-  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
-    stage.shuffleDep.markShuffleMergeFinalized
-    processShuffleMapStageCompletion(stage)
+  private[scheduler] def handleRegisterMergeStatuses(
+      stage: ShuffleMapStage,
+      mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+    // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+    if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {
+      mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId, 
mergeStatuses)
+    }
+  }
+
+  private[scheduler] def handleShuffleMergeFinalized(
+      stage: ShuffleMapStage): Unit = {
+    // Only update MapOutputTracker metadata if the stage is still active. i.e 
not cancelled.
+    if (runningStages.contains(stage)) {
+      stage.shuffleDep.markShuffleMergeFinalized()
+      processShuffleMapStageCompletion(stage)
+    } else {
+      mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)

Review comment:
       In scheduler, if a stage is in`runningStages` is the typical way in 
which we check if a stage is completed or not.
   Outside of scheduler, stage.latestInfo.failureReason is used to identify 
stage failure.
   
   Having said that, latestInfo is overwritten when a new attempt starts.
   
   For this specific case we had discussed (stage resubmission after a 
cancellation after initiating merge finalization), given a new stage attempt 
would result in the stage continuing to be in `runningStages` (but for newer 
attempt), the way to check would be:
   
   a) Fetch `stage.latestInfo.attemptNumber()` in start of 
`scheduleShuffleMergeFinalize` (within dag scheduler event loop) - and pass 
this to finalizeShuffleMerge -> RegisterMergeStatuses and ShuffleMergeFinalized.
   
   In handleShuffleMergeFinalized:
   b) If stage is not in runningStages (like currently being done), then not 
running - or
   c) If stage.latestInfo.attemptNumber() != 
ShuffleMergeFinalized.attemptNumber then attempt has changed, and finalization 
message.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
     }
   }
 
-  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
-    stage.shuffleDep.markShuffleMergeFinalized
-    processShuffleMapStageCompletion(stage)
+  private[scheduler] def handleRegisterMergeStatuses(
+      stage: ShuffleMapStage,
+      mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+    // Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+    if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {

Review comment:
       Let us do it as a follow up work, given the stage would have completed 
by the time that comes through.
   Can you add a TODO for this @venkata91 ?
   
   Note - the check `runningStages.contains(stage)` is not sufficient - as I 
detail below for `handleShuffleMergeFinalized`: the same condition applies here 
too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to