Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523711623



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
     execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage for block push/merge.
+   *
+   * Even with dynamic resource allocation kicking in and significantly 
reducing the number
+   * of available active executors, we would still be able to get sufficient 
shuffle service
+   * locations for block push/merge by getting the historical locations of 
past executors.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
+    // TODO SPARK-32920: Handle stage reuse/retry cases separately as without 
finalize
+    // TODO changes we cannot disable shuffle merge for the retry/reuse cases
+    val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+      stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+
+    if (mergerLocs.nonEmpty) {
+      stage.shuffleDep.setMergerLocs(mergerLocs)
+      logInfo(s"Shuffle merge enabled for $stage (${stage.name}) with" +

Review comment:
       Nit: Change to "Push-based shuffle enabled for stage ..."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to