[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-04 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626222342



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -689,7 +716,7 @@ private[spark] class DAGScheduler(
 dep match {
   case shufDep: ShuffleDependency[_, _, _] =>
 val mapStage = getOrCreateShuffleMapStage(shufDep, 
stage.firstJobId)
-if (!mapStage.isAvailable) {
+if (!mapStage.isAvailable || !mapStage.isMergeFinalized) {

Review comment:
   Comment is there as part of `isMergeFinalized` API similar to 
`isAvailable`. Do you think that's not enough?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-04 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626223337



##
File path: 
core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
##
@@ -1,3 +1,4 @@
 org.apache.spark.scheduler.DummyExternalClusterManager
 org.apache.spark.scheduler.MockExternalClusterManager
 org.apache.spark.scheduler.CSMockExternalClusterManager
+org.apache.spark.scheduler.PushBasedClusterManager

Review comment:
   Yes this is needed for tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-04 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626224298



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
##
@@ -3393,6 +3406,271 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assert(rprofsE === Set())
   }
 
+  private def initPushBasedShuffleConfs(conf: SparkConf) = {
+conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+conf.set(config.PUSH_BASED_SHUFFLE_ENABLED, true)
+conf.set("spark.master", "pushbasedshuffleclustermanager")
+  }
+
+  test("SPARK-32920: shuffle merge finalization") {
+initPushBasedShuffleConfs(conf)
+DAGSchedulerSuite.clearMergerLocs
+DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+val parts = 2
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+// Submit a reduce job that depends which will create a map stage
+submit(reduceRdd, (0 until parts).toArray)
+completeShuffleMapStageSuccessfully(0, 0, parts)
+assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+completeNextResultStageWithSuccess(1, 0)
+assert(results === Map(0 -> 42, 1 -> 42))
+results.clear()
+assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations not empty") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+DAGSchedulerSuite.clearMergerLocs
+DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+val parts = 2
+
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+// Submit a reduce job that depends which will create a map stage
+submit(reduceRdd, (0 until parts).toArray)
+completeShuffleMapStageSuccessfully(0, 0, parts)
+val shuffleStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+assert(shuffleStage.shuffleDep.getMergerLocs.nonEmpty)
+
+assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== parts)
+completeNextResultStageWithSuccess(1, 0)
+assert(results === Map(0 -> 42, 1 -> 42))
+
+results.clear()
+assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: merger locations reuse from shuffle dependency") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS, 3)
+DAGSchedulerSuite.clearMergerLocs
+DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+val parts = 2
+
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+submit(reduceRdd, Array(0, 1))
+
+completeShuffleMapStageSuccessfully(0, 0, parts)
+assert(shuffleDep.getMergerLocs.nonEmpty)
+val mergerLocs = shuffleDep.getMergerLocs
+completeNextResultStageWithSuccess(1, 0 )
+
+// submit another job w/ the shared dependency, and have a fetch failure
+val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+submit(reduce2, Array(0, 1))
+// Note that the stage numbering here is only b/c the shared dependency 
produces a new, skipped
+// stage.  If instead it reused the existing stage, then this would be 
stage 2
+completeNextStageWithFetchFailure(3, 0, shuffleDep)
+scheduler.resubmitFailedStages()
+
+assert(scheduler.runningStages.nonEmpty)
+assert(scheduler.stageIdToStage(2)
+  .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs.nonEmpty)
+val newMergerLocs = scheduler.stageIdToStage(2)
+  .asInstanceOf[ShuffleMapStage].shuffleDep.getMergerLocs
+
+// Check if same merger locs is reused for the new stage with shared 
shuffle dependency
+assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host))
+completeShuffleMapStageSuccessfully(2, 0, 2)
+completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+assert(results === Map(0 -> 1234, 1 -> 1235))
+
+assertDataStructuresEmpty()
+  }
+
+  test("SPARK-32920: Disable shuffle merge due to not enough mergers 
available") {
+initPushBasedShuffleConfs(conf)
+conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 6)
+DAGSchedulerSuite.clearMergerLocs
+DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+val parts = 7
+
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = 

[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-05 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626773776



##
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##
@@ -102,6 +102,12 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
*/
   private[spark] var mergerLocs: Seq[BlockManagerId] = Nil
 
+  /**
+   * Stores the information about whether the shuffle merge is finalized for 
the shuffle map stage
+   * associated with this shuffle dependency
+   */
+  private[this] var shuffleMergedFinalized: Boolean = false

Review comment:
   I think this is with in `ShuffleDependency` class only but 
`ShuffleDependency` class is in `Dependency.scala` file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-05 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r626774584



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -2075,6 +2075,25 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT =
+ConfigBuilder("spark.shuffle.push.merge.results.timeout")
+  .doc("Specify the max amount of time DAGScheduler waits for the merge 
results from " +
+"all remote shuffle services for a given shuffle. DAGScheduler will 
start to submit " +
+"following stages if not all results are received within the timeout.")
+  .version("3.1.0")
+  .stringConf
+  .createWithDefault("10s")
+
+  private[spark] val PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT =
+ConfigBuilder("spark.shuffle.push.merge.finalize.timeout")
+  .doc("Specify the amount of time DAGScheduler waits after all mappers 
finish for " +
+"a given shuffle map stage before it starts sending merge finalize 
requests to " +
+"remote shuffle services. This allows the shuffle services some extra 
time to " +
+"merge as many blocks as possible.")
+  .version("3.1.0")
+  .stringConf
+  .createWithDefault("10s")
+

Review comment:
   I think currently this is hard to tune but once we have the changes in 
for `SPARK-33701` which does adaptive merge finalization, mostly this should be 
taken care of.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-07 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r628401105



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -254,6 +259,28 @@ private[spark] class DAGScheduler(
   private val blockManagerMasterDriverHeartbeatTimeout =
 
sc.getConf.get(config.STORAGE_BLOCKMANAGER_MASTER_DRIVER_HEARTBEAT_TIMEOUT).millis
 
+  private val shuffleMergeResultsTimeoutSec =
+
JavaUtils.timeStringAsSec(sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT))
+
+  private val shuffleMergeFinalizeWaitSec =
+
JavaUtils.timeStringAsSec(sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT))
+
+  // lazy initialized so that the shuffle client can be properly initialized
+  private lazy val externalShuffleClient: Option[ExternalBlockStoreClient] =
+if (pushBasedShuffleEnabled) {
+  val transConf = SparkTransportConf.fromSparkConf(sc.conf, "shuffle", 1)
+  val shuffleClient = new ExternalBlockStoreClient(transConf, 
env.securityManager,
+env.securityManager.isAuthenticationEnabled(),
+sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
+  shuffleClient.init(sc.conf.getAppId)
+  Some(shuffleClient)
+} else {
+  None
+}
+
+  private val shuffleMergeFinalizeScheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-merge-finalizer")

Review comment:
   Changed it to use `newDaemonThreadPoolScheduledExecutor` with 8 threads. 
Don't think we need this to be configurable, let me know your thoughts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-10 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r629844923



##
File path: .idea/vcs.xml
##
@@ -1,36 +0,0 @@
-

Review comment:
   Somehow my idea file got added and pushed. I think I removed it. Isn't 
it? Let me check again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-10 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r629845232



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1271,21 +1302,28 @@ private[spark] class DAGScheduler(
* locations for block push/merge by getting the historical locations of 
past executors.
*/
   private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
-// TODO(SPARK-32920) Handle stage reuse/retry cases separately as without 
finalize
-// TODO changes we cannot disable shuffle merge for the retry/reuse cases
-val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
-  stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
-
-if (mergerLocs.nonEmpty) {
-  stage.shuffleDep.setMergerLocs(mergerLocs)
-  logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
-s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
-
-  logDebug("List of shuffle push merger locations " +
-s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
-} else {
-  logInfo("No available merger locations." +
-s" Push-based shuffle disabled for $stage (${stage.name})")
+if (stage.shuffleDep.shuffleMergeEnabled && 
!stage.shuffleDep.shuffleMergeFinalized
+  && stage.shuffleDep.getMergerLocs.isEmpty) {
+  val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+  if (mergerLocs.nonEmpty) {
+stage.shuffleDep.setMergerLocs(mergerLocs)
+logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
+  s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
+
+logDebug("List of shuffle push merger locations " +
+  s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+  } else {
+stage.shuffleDep.setShuffleMergeEnabled(false)
+logInfo("Push-based shuffle disabled for $stage (${stage.name})")
+  }
+} else if (stage.shuffleDep.shuffleMergeFinalized) {
+  // Disable Shuffle merge for the retry/reuse of the same shuffle 
dependency if it has
+  // already been merge finalized. If the shuffle dependency was 
previously assigned merger
+  // locations but the corresponding shuffle map stage did not complete 
successfully, we
+  // would still enable push for its retry.

Review comment:
   Yes, we are disabling merge in those case since it is already finalized.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-10 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r629848900



##
File path: .idea/vcs.xml
##
@@ -1,36 +0,0 @@
-

Review comment:
   Sorry my bad, I think it got added as part of this `[SPARK-35223] Add 
IssueNavigationLink` I thought I added it by mistake. Fixed it. Should be good 
now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-25 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r639104565



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2004,6 +2020,131 @@ private[spark] class DAGScheduler(
 }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+logInfo(("%s (%s) scheduled for finalizing" +
+  " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+shuffleMergeFinalizeScheduler.schedule(
+  new Runnable {
+override def run(): Unit = finalizeShuffleMerge(stage)
+  },
+  shuffleMergeFinalizeWaitSec,
+  TimeUnit.SECONDS
+)
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))

Review comment:
   Addressed this comment after discussing with @mridulm offline. Mridul, I 
tried adding test for the cancellation but the 
`DAGSchedulerEventProcessLoopTester` is not async as it simply forwards the 
event for immediate processing. Let me know if you have other ideas to test 
this particular situation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-26 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r640027450



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
 }
   }
 
-  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
-stage.shuffleDep.markShuffleMergeFinalized
-processShuffleMapStageCompletion(stage)
+  private[scheduler] def handleRegisterMergeStatuses(
+  stage: ShuffleMapStage,
+  mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+// Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {
+  mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId, 
mergeStatuses)
+}
+  }
+
+  private[scheduler] def handleShuffleMergeFinalized(
+  stage: ShuffleMapStage): Unit = {
+// Only update MapOutputTracker metadata if the stage is still active. i.e 
not cancelled.
+if (runningStages.contains(stage)) {
+  stage.shuffleDep.markShuffleMergeFinalized()
+  processShuffleMapStageCompletion(stage)
+} else {
+  mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)

Review comment:
   Stage will be active until the shuffle merge is finalized and then only 
we are processing the map stage completion, isn't it? So if the stage is not 
part of running stages and we still reach the handling shuffle merge finalize, 
then we need to unregister the merge results, isn't it?
   
   Can you think of a scenario where stage is not part of running stages and 
still shuffle merge is finalized? - Ideally this should not happen.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-27 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r641094842



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2136,9 +2137,24 @@ private[spark] class DAGScheduler(
 }
   }
 
-  private[scheduler] def handleShuffleMergeFinalized(stage: ShuffleMapStage): 
Unit = {
-stage.shuffleDep.markShuffleMergeFinalized
-processShuffleMapStageCompletion(stage)
+  private[scheduler] def handleRegisterMergeStatuses(
+  stage: ShuffleMapStage,
+  mergeStatuses: Seq[(Int, MergeStatus)]): Unit = {
+// Register merge statuses if the stage is still running and shuffle merge 
is not finalized yet.
+if (runningStages.contains(stage) && 
!stage.shuffleDep.shuffleMergeFinalized) {
+  mapOutputTracker.registerMergeResults(stage.shuffleDep.shuffleId, 
mergeStatuses)
+}
+  }
+
+  private[scheduler] def handleShuffleMergeFinalized(
+  stage: ShuffleMapStage): Unit = {
+// Only update MapOutputTracker metadata if the stage is still active. i.e 
not cancelled.
+if (runningStages.contains(stage)) {
+  stage.shuffleDep.markShuffleMergeFinalized()
+  processShuffleMapStageCompletion(stage)
+} else {
+  mapOutputTracker.unregisterAllMergeResult(stage.shuffleDep.shuffleId)

Review comment:
   Discussed offline with @mridulm and currently there are few corner cases 
which needs to be carefully thought through before having this behavior. 
Created a TODO and a corresponding follow up JIRA - 
https://issues.apache.org/jira/browse/SPARK-35549




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-27 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r641095507



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2004,6 +2020,131 @@ private[spark] class DAGScheduler(
 }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+logInfo(("%s (%s) scheduled for finalizing" +
+  " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+shuffleMergeFinalizeScheduler.schedule(
+  new Runnable {
+override def run(): Unit = finalizeShuffleMerge(stage)
+  },
+  shuffleMergeFinalizeWaitSec,
+  TimeUnit.SECONDS
+)
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))

Review comment:
   Added additional tests to handle the cases of stage cancellation, 
barrier stage, late arrival of merge results etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-05-28 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r641707976



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2000,6 +2023,147 @@ private[spark] class DAGScheduler(
 }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+logInfo(("%s (%s) scheduled for finalizing" +
+  " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+shuffleMergeFinalizeScheduler.schedule(
+  new Runnable {
+override def run(): Unit = finalizeShuffleMerge(stage)
+  },
+  shuffleMergeFinalizeWaitSec,
+  TimeUnit.SECONDS
+)
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+externalShuffleClient.foreach { shuffleClient =>
+  val shuffleId = stage.shuffleDep.shuffleId
+  val numMergers = stage.shuffleDep.getMergerLocs.length
+  val numResponses = new AtomicInteger()
+  val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+  val timedOut = new AtomicBoolean()
+
+  def increaseAndCheckResponseCount(): Unit = {
+if (numResponses.incrementAndGet() == numMergers) {
+  logInfo("%s (%s) shuffle merge finalized".format(stage, stage.name))
+  // Since this runs in the netty client thread and is outside of 
DAGScheduler
+  // event loop, we only post ShuffleMergeFinalized event into the 
event queue.
+  // The processing of this event should be done inside the event 
loop, so it
+  // can safely modify scheduler's internal state.
+  eventProcessLoop.post(ShuffleMergeFinalized(stage))
+}
+  }
+
+  stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle merge 
on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+assert(shuffleId == statuses.shuffleId)
+if (!timedOut.get()) {
+  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
+convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+  increaseAndCheckResponseCount()
+  results(index).set(true)
+}
+  }
+
+  override def onShuffleMergeFailure(e: Throwable): Unit = {
+if (!timedOut.get()) {
+  logWarning(s"Exception encountered when trying to finalize 
shuffle " +
+s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+  increaseAndCheckResponseCount()
+  // Do not fail the future as this would cause dag scheduler 
to prematurely
+  // give up on waiting for merge results from the remaining 
shuffle services
+  // if one fails
+  results(index).set(false)
+}
+  }
+})
+  }
+  // DAGScheduler only waits for a limited amount of time for the merge 
results.
+  // It will attempt to submit the next stage(s) irrespective of whether 
merge results
+  // from all shuffle services are received or not.
+  // TODO: SPARK-33701: Instead of waiting for a constant amount of time 
for finalization
+  // TODO: for all the stages, adaptively tune timeout for merge 
finalization
+  try {
+Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
+  } catch {
+case _: TimeoutException =>
+  logInfo(s"Timed out on waiting for merge results from all " +
+s"$numMergers mergers for shuffle $shuffleId")
+  timedOut.set(true)
+  eventProcessLoop.post(ShuffleMergeFinalized(stage))
+  }
+}
+  }
+
+  private def processShuffleMapStageCompletion(shuffleStage: ShuffleMapStage): 
Unit = {
+markStageAsFinished(shuffleStage)
+logInfo("looking for newly runnable stages")
+lo

[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-01 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r643344276



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1743,6 +1762,10 @@ private[spark] class DAGScheduler(
   } else if (mapIndex != -1) {
 // Mark the map whose fetch failed as broken in the map stage
 mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, 
bmAddress)
+if (mapStage.shuffleDep.shuffleMergeEnabled) {
+  mapOutputTracker.
+unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))

Review comment:
   @otterc one question which I couldn't understand from the internal fix 
is, should we pass `mapId` or `mapIndex` to the `unregisterMergeResult` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-01 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r643574768



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1743,6 +1762,10 @@ private[spark] class DAGScheduler(
   } else if (mapIndex != -1) {
 // Mark the map whose fetch failed as broken in the map stage
 mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, 
bmAddress)
+if (mapStage.shuffleDep.shuffleMergeEnabled) {
+  mapOutputTracker.
+unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))

Review comment:
   Thanks. Added a comment to keep it clear as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-01 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r643661092



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -737,25 +737,26 @@ private[spark] class MapOutputTrackerMaster(
   }
 
   /**
-   * Unregisters a merge result corresponding to the reduceId if present. If 
the optional mapId
-   * is specified, it will only unregister the merge result if the mapId is 
part of that merge
+   * Unregisters a merge result corresponding to the reduceId if present. If 
the optional mapIndex
+   * is specified, it will only unregister the merge result if the mapIndex is 
part of that merge
* result.
*
* @param shuffleId the shuffleId.
* @param reduceId  the reduceId.
* @param bmAddress block manager address.
-   * @param mapId the optional mapId which should be checked to see it was 
part of the merge
-   *  result.
+   * @param mapIndex  the optional mapIndex which should be checked to see it 
was part of the
+   *  merge result.
*/
   def unregisterMergeResult(
 shuffleId: Int,
 reduceId: Int,
 bmAddress: BlockManagerId,
-mapId: Option[Int] = None) {
+mapIndex: Option[Int] = None) {
 shuffleStatuses.get(shuffleId) match {
   case Some(shuffleStatus) =>
 val mergeStatus = shuffleStatus.mergeStatuses(reduceId)
-if (mergeStatus != null && (mapId.isEmpty || 
mergeStatus.tracker.contains(mapId.get))) {
+if (mergeStatus != null &&
+  (mapIndex.isEmpty || mergeStatus.tracker.contains(mapIndex.get))) {
   shuffleStatus.removeMergeResult(reduceId, bmAddress)

Review comment:
   [SPARK-32923](https://issues.apache.org/jira/browse/SPARK-32923) would 
handle non deterministic stage retries right? Do you mean we should remove the 
`mapOutputTracker.unregisterMergeResult` call in `DAGScheduler`? This change is 
already added as part of 
[SPARK-32921](https://issues.apache.org/jira/browse/SPARK-32921)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-07 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r646974281



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2000,6 +2025,149 @@ private[spark] class DAGScheduler(
 }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+logInfo(("%s (%s) scheduled for finalizing" +
+  " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+shuffleMergeFinalizeScheduler.schedule(
+  new Runnable {
+override def run(): Unit = finalizeShuffleMerge(stage)
+  },
+  shuffleMergeFinalizeWaitSec,
+  TimeUnit.SECONDS
+)
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+externalShuffleClient.foreach { shuffleClient =>
+  val shuffleId = stage.shuffleDep.shuffleId
+  val numMergers = stage.shuffleDep.getMergerLocs.length
+  val numResponses = new AtomicInteger()
+  val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+  val timedOut = new AtomicBoolean()

Review comment:
   After having offline discussion, we simplified it further just by 
posting `ShuffleMergeFinalized` only in the finally block that way it 
guarantees the event itself would get posted only once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647758831



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2000,6 +2023,133 @@ private[spark] class DAGScheduler(
 }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+logInfo(("%s (%s) scheduled for finalizing" +
+  " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+shuffleMergeFinalizeScheduler.schedule(
+  new Runnable {
+override def run(): Unit = finalizeShuffleMerge(stage)
+  },
+  shuffleMergeFinalizeWaitSec,
+  TimeUnit.SECONDS
+)
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+externalShuffleClient.foreach { shuffleClient =>
+  val shuffleId = stage.shuffleDep.shuffleId
+  val numMergers = stage.shuffleDep.getMergerLocs.length
+  val numResponses = new AtomicInteger()
+  val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+
+  stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle merge 
on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+assert(shuffleId == statuses.shuffleId)
+eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
+  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+numResponses.incrementAndGet()
+results(index).set(true)
+  }
+
+  override def onShuffleMergeFailure(e: Throwable): Unit = {
+logWarning(s"Exception encountered when trying to finalize 
shuffle " +
+  s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+numResponses.incrementAndGet()
+// Do not fail the future as this would cause dag scheduler to 
prematurely
+// give up on waiting for merge results from the remaining 
shuffle services
+// if one fails
+results(index).set(false)
+  }
+})
+  }
+  // DAGScheduler only waits for a limited amount of time for the merge 
results.
+  // It will attempt to submit the next stage(s) irrespective of whether 
merge results
+  // from all shuffle services are received or not.
+  // TODO: SPARK-33701: Instead of waiting for a constant amount of time 
for finalization
+  // TODO: for all the stages, adaptively tune timeout for merge 
finalization
+  try {
+Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, 
TimeUnit.SECONDS)
+  } catch {
+case _: TimeoutException =>
+  logInfo(s"Timed out on waiting for merge results from all " +
+s"$numMergers mergers for shuffle $shuffleId")
+  } finally {
+eventProcessLoop.post(ShuffleMergeFinalized(stage))
+  }
+}
+  }
+
+  private def processShuffleMapStageCompletion(shuffleStage: ShuffleMapStage): 
Unit = {
+markStageAsFinished(shuffleStage)
+logInfo("looking for newly runnable stages")
+logInfo("running: " + runningStages)
+logInfo("waiting: " + waitingStages)
+logInfo("failed: " + failedStages)
+
+// This call to increment the epoch may not be strictly necessary, but it 
is retained
+// for now in order to minimize the changes in behavior from an earlier 
version of the
+// code. This existing behavior of always incrementing the epoch following 
any
+// successful shuffle map stage completion may have benefits by causing 
unneeded
+// cached map outputs to be cleaned up earlier on executors. In the future 
we can
+// consider removing this call, but this will require some extra 
investigation.
+// See https://github.com/apache/spark/pull/17955/files#r117385673 for 
more details.
+mapOutputTracker.incrementEpoch()
+
+clearCacheLocs()
+
+ 

[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647763338



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -254,6 +259,29 @@ private[spark] class DAGScheduler(
   private val blockManagerMasterDriverHeartbeatTimeout =
 
sc.getConf.get(config.STORAGE_BLOCKMANAGER_MASTER_DRIVER_HEARTBEAT_TIMEOUT).millis
 
+  private val shuffleMergeResultsTimeoutSec =
+sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT)
+
+  private val shuffleMergeFinalizeWaitSec =
+sc.getConf.get(config.PUSH_BASED_SHUFFLE_MERGE_FINALIZE_TIMEOUT)
+
+  // Since SparkEnv gets initialized after DAGScheduler, externalShuffleClient 
needs to be
+  // initialized lazily
+  private lazy val externalShuffleClient: Option[ExternalBlockStoreClient] =
+if (pushBasedShuffleEnabled) {
+  val transConf = SparkTransportConf.fromSparkConf(sc.conf, "shuffle", 1)
+  val shuffleClient = new ExternalBlockStoreClient(transConf, 
env.securityManager,
+env.securityManager.isAuthenticationEnabled(),
+sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
+  shuffleClient.init(sc.conf.getAppId)
+  Some(shuffleClient)
+} else {
+  None
+}
+
+  private val shuffleMergeFinalizeScheduler =
+
ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer", 8)

Review comment:
   Yes, I don't think we need a config as of now. In future, if needed we 
can always add a config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647868344



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1298,7 +1329,7 @@ private[spark] class DAGScheduler(
 // `findMissingPartitions()` returns all partitions every time.
 stage match {
   case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
-mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
+
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)

Review comment:
   Made a comment here to handle clean up of shuffle merge metadata as part 
of handling SPARK-32923 (non-deterministic stage retries) and in SPARK-35547 
(handling barrier execution mode)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647868622



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1739,18 +1756,24 @@ private[spark] class DAGScheduler(
   if (mapStage.rdd.isBarrier()) {
 // Mark all the map as broken in the map stage, to ensure retry 
all the tasks on
 // resubmitted stage attempt.
-mapOutputTracker.unregisterAllMapOutput(shuffleId)
+mapOutputTracker.unregisterAllMapAndMergeOutput(shuffleId)
   } else if (mapIndex != -1) {
 // Mark the map whose fetch failed as broken in the map stage
 mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, 
bmAddress)
+if (mapStage.shuffleDep.shuffleMergeEnabled) {

Review comment:
   Instead of checking for shuffle dependency flag which can be set to 
false later only checking for the global `pushBasedShuffleEnabled` variable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647868756



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1678,38 +1717,16 @@ private[spark] class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  markStageAsFinished(shuffleStage)
-  logInfo("looking for newly runnable stages")
-  logInfo("running: " + runningStages)
-  logInfo("waiting: " + waitingStages)
-  logInfo("failed: " + failedStages)
-
-  // This call to increment the epoch may not be strictly 
necessary, but it is retained
-  // for now in order to minimize the changes in behavior from an 
earlier version of the
-  // code. This existing behavior of always incrementing the epoch 
following any
-  // successful shuffle map stage completion may have benefits by 
causing unneeded
-  // cached map outputs to be cleaned up earlier on executors. In 
the future we can
-  // consider removing this call, but this will require some extra 
investigation.
-  // See 
https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
-  mapOutputTracker.incrementEpoch()
-
-  clearCacheLocs()
-
-  if (!shuffleStage.isAvailable) {
-// Some tasks had failed; let's resubmit this shuffleStage.
-// TODO: Lower-level scheduler should also deal with this
-logInfo("Resubmitting " + shuffleStage + " (" + 
shuffleStage.name +
-  ") because some of its tasks had failed: " +
-  shuffleStage.findMissingPartitions().mkString(", "))
-submitStage(shuffleStage)
+  if (!shuffleStage.isMergeFinalized &&

Review comment:
   Added the check of `shuffleStage.isAvailable` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647868756



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1678,38 +1717,16 @@ private[spark] class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  markStageAsFinished(shuffleStage)
-  logInfo("looking for newly runnable stages")
-  logInfo("running: " + runningStages)
-  logInfo("waiting: " + waitingStages)
-  logInfo("failed: " + failedStages)
-
-  // This call to increment the epoch may not be strictly 
necessary, but it is retained
-  // for now in order to minimize the changes in behavior from an 
earlier version of the
-  // code. This existing behavior of always incrementing the epoch 
following any
-  // successful shuffle map stage completion may have benefits by 
causing unneeded
-  // cached map outputs to be cleaned up earlier on executors. In 
the future we can
-  // consider removing this call, but this will require some extra 
investigation.
-  // See 
https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
-  mapOutputTracker.incrementEpoch()
-
-  clearCacheLocs()
-
-  if (!shuffleStage.isAvailable) {
-// Some tasks had failed; let's resubmit this shuffleStage.
-// TODO: Lower-level scheduler should also deal with this
-logInfo("Resubmitting " + shuffleStage + " (" + 
shuffleStage.name +
-  ") because some of its tasks had failed: " +
-  shuffleStage.findMissingPartitions().mkString(", "))
-submitStage(shuffleStage)
+  if (!shuffleStage.isMergeFinalized &&

Review comment:
   Added the check of `shuffleStage.isAvailable` here. Also consolidated 
checking for merge finalized by getting rid of `isMergeFinalized` in 
`ShuffleMapStage` by moving that into `ShuffleDependency` 
`shuffleMergeFinalized`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647869229



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -2000,6 +2023,133 @@ private[spark] class DAGScheduler(
 }
   }
 
+  /**
+   * Schedules shuffle merge finalize.
+   */
+  private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): 
Unit = {
+logInfo(("%s (%s) scheduled for finalizing" +
+  " shuffle merge in %s s").format(stage, stage.name, 
shuffleMergeFinalizeWaitSec))
+shuffleMergeFinalizeScheduler.schedule(
+  new Runnable {
+override def run(): Unit = finalizeShuffleMerge(stage)
+  },
+  shuffleMergeFinalizeWaitSec,
+  TimeUnit.SECONDS
+)
+  }
+
+  /**
+   * DAGScheduler notifies all the remote shuffle services chosen to serve 
shuffle merge request for
+   * the given shuffle map stage to finalize the shuffle merge process for 
this shuffle. This is
+   * invoked in a separate thread to reduce the impact on the DAGScheduler 
main thread, as the
+   * scheduler might need to talk to 1000s of shuffle services to finalize 
shuffle merge.
+   */
+  private[scheduler] def finalizeShuffleMerge(stage: ShuffleMapStage): Unit = {
+logInfo("%s (%s) finalizing the shuffle merge".format(stage, stage.name))
+externalShuffleClient.foreach { shuffleClient =>
+  val shuffleId = stage.shuffleDep.shuffleId
+  val numMergers = stage.shuffleDep.getMergerLocs.length
+  val numResponses = new AtomicInteger()
+  val results = (0 until numMergers).map(_ => 
SettableFuture.create[Boolean]())
+
+  stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
+case (shuffleServiceLoc, index) =>
+  // Sends async request to shuffle service to finalize shuffle merge 
on that host
+  // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
+  // TODO: during shuffleMergeFinalizeWaitSec
+  shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
+shuffleServiceLoc.port, shuffleId,
+new MergeFinalizerListener {
+  override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
+assert(shuffleId == statuses.shuffleId)
+eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
+  convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
+numResponses.incrementAndGet()
+results(index).set(true)
+  }
+
+  override def onShuffleMergeFailure(e: Throwable): Unit = {
+logWarning(s"Exception encountered when trying to finalize 
shuffle " +
+  s"merge on ${shuffleServiceLoc.host} for shuffle 
$shuffleId", e)
+numResponses.incrementAndGet()
+// Do not fail the future as this would cause dag scheduler to 
prematurely
+// give up on waiting for merge results from the remaining 
shuffle services
+// if one fails
+results(index).set(false)
+  }
+})
+  }
+  // DAGScheduler only waits for a limited amount of time for the merge 
results.
+  // It will attempt to submit the next stage(s) irrespective of whether 
merge results
+  // from all shuffle services are received or not.
+  // TODO: SPARK-33701: Instead of waiting for a constant amount of time 
for finalization
+  // TODO: for all the stages, adaptively tune timeout for merge 
finalization

Review comment:
   Moved the comment to the right place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkata91 commented on a change in pull request #30691: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-08 Thread GitBox


venkata91 commented on a change in pull request #30691:
URL: https://github.com/apache/spark/pull/30691#discussion_r647869895



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1739,18 +1756,24 @@ private[spark] class DAGScheduler(
   if (mapStage.rdd.isBarrier()) {
 // Mark all the map as broken in the map stage, to ensure retry 
all the tasks on
 // resubmitted stage attempt.
-mapOutputTracker.unregisterAllMapOutput(shuffleId)
+mapOutputTracker.unregisterAllMapAndMergeOutput(shuffleId)

Review comment:
   Same as above, added a comment here to handle it as part of handling 
barrier execution mode handling.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org