mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r954454349


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2259,37 +2259,51 @@ private[spark] class DAGScheduler(
                     }
 
                     override def onShuffleMergeFailure(e: Throwable): Unit = {
+                      if (e.isInstanceOf[IOException]) {
+                        logInfo(s"Failed to connect external shuffle service " 
+
+                          s"${shuffleServiceLoc.hostPort}")
+                        
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+                      }
                     }
                   })
             }
           }
         }, 0, TimeUnit.SECONDS)
       } else {
-        stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-          case (shuffleServiceLoc, index) =>
-            // Sends async request to shuffle service to finalize shuffle 
merge on that host
-            // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-            // TODO: during shuffleMergeFinalizeWaitSec
-            shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-              shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-              new MergeFinalizerListener {
-                override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-                  assert(shuffleId == statuses.shuffleId)
-                  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-                    convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-                  results(index).set(true)
-                }
+        shuffleMergeFinalizeScheduler.schedule(new Runnable {

Review Comment:
   Thoughts on pushing finalization send into the threadpool instead ?
   
   ```
           stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
             case (shuffleServiceLoc, index) =>
               shuffleMergeFinalizeScheduler.schedule(new Runnable() {
                 override def run(): Unit = {
                       // existing code within the "case"
                  
                 }
               }, 0, TimeUnit.SECONDS));
   ```
   
   We should also bump up the default number of threads in this threadpool.
   
   This will make sure that we wait utmost for `shuffleMergeResultsTimeoutSec` 
seconds for finalization to complete.
   
   Thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to