mridulm commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626661012



##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -102,6 +102,12 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
    */
   private[spark] var mergerLocs: Seq[BlockManagerId] = Nil
 
+  /**
+   * Stores the information about whether the shuffle merge is finalized for 
the shuffle map stage
+   * associated with this shuffle dependency
+   */
+  private[this] var shuffleMergedFinalized: Boolean = false

Review comment:
       super nit: `shuffleMergedFinalized` -> `_shuffleMergeFinalized` ?
   Also, do we want to move this to `ShuffleDependency` instead ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2004,6 +2006,142 @@ private[spark] class DAGScheduler(
     }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+    // TODO Use the default single threaded scheduler or extend ThreadUtils to
+    // TODO support the multi-threaded scheduler?
+    logInfo(("%s (%s) scheduled for finalizing" +
+      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+    shuffleMergeFinalizeScheduler.schedule(
+      new Runnable {
+        override def run(): Unit = finalizeShuffleMerge(stage)
+      },
+      shuffleMergeFinalizeWaitSec,
+      TimeUnit.SECONDS
+    )
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+    logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+    externalShuffleClient.foreach { shuffleClient =>
+      val shuffleId = stage.shuffleDep.shuffleId
+      val numMergers = stage.shuffleDep.getMergerLocs.length
+      val numResponses = new AtomicInteger()
+      val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+      val timedOut = new AtomicBoolean()
+
+      // NOTE: This is a defensive check to post finalize event if numMergers 
is 0 (i.e. no shuffle
+      // service available).
+      if (numMergers == 0) {
+        eventProcessLoop.post(ShuffleMergeFinalized(stage))
+        return
+      }
+
+      def increaseAndCheckResponseCount: Unit = {

Review comment:
       `increaseAndCheckResponseCount` -> `increaseAndCheckResponseCount()`

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2075,6 +2075,25 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+    ConfigBuilder("spark.shuffle.push.merge.results.timeout")
+      .doc("Specify the max amount of time DAGScheduler waits for the merge 
results from " +
+        "all remote shuffle services for a given shuffle. DAGScheduler will 
start to submit " +
+        "following stages if not all results are received within the timeout.")
+      .version("3.1.0")

Review comment:
       Change version to 3.2.0 for these configs

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2075,6 +2075,25 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+    ConfigBuilder("spark.shuffle.push.merge.results.timeout")
+      .doc("Specify the max amount of time DAGScheduler waits for the merge 
results from " +
+        "all remote shuffle services for a given shuffle. DAGScheduler will 
start to submit " +
+        "following stages if not all results are received within the timeout.")
+      .version("3.1.0")
+      .stringConf

Review comment:
       `stringConf` -> `timeConf` (here and elsewhere) and remove all manual 
conversions ?

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2075,6 +2075,25 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+    ConfigBuilder("spark.shuffle.push.merge.results.timeout")
+      .doc("Specify the max amount of time DAGScheduler waits for the merge 
results from " +
+        "all remote shuffle services for a given shuffle. DAGScheduler will 
start to submit " +
+        "following stages if not all results are received within the timeout.")
+      .version("3.1.0")
+      .stringConf
+      .createWithDefault("10s")
+
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT =
+    ConfigBuilder("spark.shuffle.push.merge.finalize.timeout")
+      .doc("Specify the amount of time DAGScheduler waits after all mappers 
finish for " +
+        "a given shuffle map stage before it starts sending merge finalize 
requests to " +
+        "remote shuffle services. This allows the shuffle services some extra 
time to " +
+        "merge as many blocks as possible.")
+      .version("3.1.0")
+      .stringConf
+      .createWithDefault("10s")
+

Review comment:
       Taken together, this accounts for driver waiting for upto 20s per stage 
... do we have recommendations on how users can tune this ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -254,6 +259,28 @@ private[spark] class DAGScheduler(
   private val blockManagerMasterDriverHeartbeatTimeout =
     
sc.getConf.get(config.STORAGE_BLOCKMANAGER_MASTER_DRIVER_HEARTBEAT_TIMEOUT).millis
 
+  private val shuffleMergeResultsTimeoutSec =
+    
JavaUtils.timeStringAsSec(sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT))
+
+  private val shuffleMergeFinalizeWaitSec =
+    
JavaUtils.timeStringAsSec(sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT))
+
+  // lazy initialized so that the shuffle client can be properly initialized
+  private lazy val externalShuffleClient: Option[ExternalBlockStoreClient] =
+    if (pushBasedShuffleEnabled) {
+      val transConf = SparkTransportConf.fromSparkConf(sc.conf, "shuffle", 1)
+      val shuffleClient = new ExternalBlockStoreClient(transConf, 
env.securityManager,
+        env.securityManager.isAuthenticationEnabled(),
+        sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
+      shuffleClient.init(sc.conf.getAppId)
+      Some(shuffleClient)
+    } else {
+      None
+    }
+
+  private val shuffleMergeFinalizeScheduler =
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-merge-finalizer")

Review comment:
       Given `Futures.allAsList` in the finalizer thread, a single threaded 
executor would impact when multiple stages are being finalized concurrently.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2004,6 +2006,142 @@ private[spark] class DAGScheduler(
     }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+    // TODO Use the default single threaded scheduler or extend ThreadUtils to
+    // TODO support the multi-threaded scheduler?
+    logInfo(("%s (%s) scheduled for finalizing" +
+      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+    shuffleMergeFinalizeScheduler.schedule(
+      new Runnable {
+        override def run(): Unit = finalizeShuffleMerge(stage)
+      },
+      shuffleMergeFinalizeWaitSec,
+      TimeUnit.SECONDS
+    )
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+    logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+    externalShuffleClient.foreach { shuffleClient =>
+      val shuffleId = stage.shuffleDep.shuffleId
+      val numMergers = stage.shuffleDep.getMergerLocs.length
+      val numResponses = new AtomicInteger()
+      val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+      val timedOut = new AtomicBoolean()
+
+      // NOTE: This is a defensive check to post finalize event if numMergers 
is 0 (i.e. no shuffle
+      // service available).
+      if (numMergers == 0) {
+        eventProcessLoop.post(ShuffleMergeFinalized(stage))
+        return
+      }
+
+      def increaseAndCheckResponseCount: Unit = {
+        if (numResponses.incrementAndGet() == numMergers) {
+          // Since this runs in the netty client thread and is outside of 
DAGScheduler
+          // event loop, we only post ShuffleMergeFinalized event into the 
event queue.
+          // The processing of this event should be done inside the event 
loop, so it
+          // can safely modify scheduler's internal state.
+          logInfo("%s (%s) shuffle merge finalized".format(stage, stage.name))
+          eventProcessLoop.post(ShuffleMergeFinalized(stage))
+        }
+      }
+
+      stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+        case (shuffleServiceLoc, index) =>
+          // Sends async request to shuffle service to finalize shuffle merge 
on that host
+          shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+            shuffleServiceLoc.port, shuffleId,
+            new MergeFinalizerListener {
+              override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+                assert(shuffleId == statuses.shuffleId)
+                // Register the merge results even if already timed out, in 
case the reducer
+                // needing this merged block starts after dag scheduler 
receives this response.
+                mapOutputTracker.registerMergeResults(statuses.shuffleId,
+                  MergeStatus.convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc))
+                if (!timedOut.get()) {
+                  increaseAndCheckResponseCount
+                  results(index).set(true)
+                }
+              }
+
+              override def onShuffleMergeFailure(e: Throwable): Unit = {
+                if (!timedOut.get()) {
+                  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
+                    s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+                  increaseAndCheckResponseCount
+                  // Do not fail the future as this would cause dag scheduler 
to prematurely
+                  // give up on waiting for merge results from the remaining 
shuffle services
+                  // if one fails
+                  results(index).set(false)
+                }
+              }
+            })
+      }
+      // DAGScheduler only waits for a limited amount of time for the merge 
results.
+      // It will attempt to submit the next stage(s) irrespective of whether 
merge results
+      // from all shuffle services are received or not.
+      // TODO what are the reasonable configurations for the 2 timeouts? When 
# mappers
+      // TODO and # reducers for a shuffle is really large, and if the merge 
ratio is not
+      // TODO high enough, the MergeStatuses to be retrieved from 1 shuffle 
service could
+      // TODO be pretty large (10s MB to 100s MB). How to properly handle this 
scenario?
+      try {
+        Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
+      } catch {
+        case _: TimeoutException =>
+          logInfo(s"Timed out on waiting for merge results from all " +
+            s"$numMergers mergers for shuffle $shuffleId")
+          timedOut.set(true)
+          eventProcessLoop.post(ShuffleMergeFinalized(stage))
+      }
+    }
+  }
+
+  private def processShuffleMapStageCompletion(shuffleStage: ShuffleMapStage): 
Unit = {

Review comment:
       review note: no changes here. Method extracted from 
`handleTaskCompletion`

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1678,33 +1703,10 @@ private[spark] class DAGScheduler(
             }
 
             if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-              markStageAsFinished(shuffleStage)
-              logInfo("looking for newly runnable stages")
-              logInfo("running: " + runningStages)
-              logInfo("waiting: " + waitingStages)
-              logInfo("failed: " + failedStages)
-
-              // This call to increment the epoch may not be strictly 
necessary, but it is retained
-              // for now in order to minimize the changes in behavior from an 
earlier version of the
-              // code. This existing behavior of always incrementing the epoch 
following any
-              // successful shuffle map stage completion may have benefits by 
causing unneeded
-              // cached map outputs to be cleaned up earlier on executors. In 
the future we can
-              // consider removing this call, but this will require some extra 
investigation.
-              // See 
https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
-              mapOutputTracker.incrementEpoch()
-
-              clearCacheLocs()
-
-              if (!shuffleStage.isAvailable) {
-                // Some tasks had failed; let's resubmit this shuffleStage.
-                // TODO: Lower-level scheduler should also deal with this
-                logInfo("Resubmitting " + shuffleStage + " (" + 
shuffleStage.name +
-                  ") because some of its tasks had failed: " +
-                  shuffleStage.findMissingPartitions().mkString(", "))
-                submitStage(shuffleStage)
+              if (pushBasedShuffleEnabled) {

Review comment:
       This is getting handled within `finalizeShuffleMerge` - any issues with 
pulling it out like @otterc suggested ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2004,6 +2006,142 @@ private[spark] class DAGScheduler(
     }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+    // TODO Use the default single threaded scheduler or extend ThreadUtils to
+    // TODO support the multi-threaded scheduler?
+    logInfo(("%s (%s) scheduled for finalizing" +
+      " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+    shuffleMergeFinalizeScheduler.schedule(
+      new Runnable {
+        override def run(): Unit = finalizeShuffleMerge(stage)
+      },
+      shuffleMergeFinalizeWaitSec,
+      TimeUnit.SECONDS
+    )
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+    logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+    externalShuffleClient.foreach { shuffleClient =>
+      val shuffleId = stage.shuffleDep.shuffleId
+      val numMergers = stage.shuffleDep.getMergerLocs.length
+      val numResponses = new AtomicInteger()
+      val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+      val timedOut = new AtomicBoolean()
+
+      // NOTE: This is a defensive check to post finalize event if numMergers 
is 0 (i.e. no shuffle
+      // service available).
+      if (numMergers == 0) {
+        eventProcessLoop.post(ShuffleMergeFinalized(stage))
+        return
+      }
+
+      def increaseAndCheckResponseCount: Unit = {
+        if (numResponses.incrementAndGet() == numMergers) {
+          // Since this runs in the netty client thread and is outside of 
DAGScheduler
+          // event loop, we only post ShuffleMergeFinalized event into the 
event queue.
+          // The processing of this event should be done inside the event 
loop, so it
+          // can safely modify scheduler's internal state.

Review comment:
       nit: Move this comment to the first case of `post` within this method ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to