mridulm commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r646262415



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2000,6 +2025,149 @@ private[spark] class DAGScheduler(
     }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+    logInfo(("%s (%s) scheduled for finalizing" +
+      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+    shuffleMergeFinalizeScheduler.schedule(
+      new Runnable {
+        override def run(): Unit = finalizeShuffleMerge(stage)
+      },
+      shuffleMergeFinalizeWaitSec,
+      TimeUnit.SECONDS
+    )
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+    logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+    externalShuffleClient.foreach { shuffleClient =>
+      val shuffleId = stage.shuffleDep.shuffleId
+      val numMergers = stage.shuffleDep.getMergerLocs.length
+      val numResponses = new AtomicInteger()
+      val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+      val timedOut = new AtomicBoolean()

Review comment:
       `timedOut` is insufficient to prevent a race - we can have:
   a) `ShuffleMergeFinalized` being posted in IO thread when all partitions 
complete.
   b) Futures timeout before `increaseAndCheckResponseCount` completes.
   
   This will result in two `ShuffleMergeFinalized` being posted - and the 
second one will cause invalidation of merge result.
   
   To fix this, I propose the following:
   
   a) Rename `timedOut` to `shouldPostFinalizedEvent` - with default value of 
`true`.
   b) post the event only when `shouldPostFinalizedEvent.getAndSet(false)` 
returns true (in `increaseAndCheckResponseCount` and in `TimeoutException` 
exception catch block).
   
   This will ensure that there is only one `ShuffleMergeFinalized` posted - 
either due to timeout or due to required mergers completing.
   
   Thoughts ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to