mridulm commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r954454349


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2259,37 +2259,51 @@ private[spark] class DAGScheduler(
                     }
 
                     override def onShuffleMergeFailure(e: Throwable): Unit = {
+                      if (e.isInstanceOf[IOException]) {
+                        logInfo(s"Failed to connect external shuffle service " 
+
+                          s"${shuffleServiceLoc.hostPort}")
+                        
blockManagerMaster.removeShufflePushMergerLocation(shuffleServiceLoc.host)
+                      }
                     }
                   })
             }
           }
         }, 0, TimeUnit.SECONDS)
       } else {
-        stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
-          case (shuffleServiceLoc, index) =>
-            // Sends async request to shuffle service to finalize shuffle 
merge on that host
-            // TODO: SPARK-35536: Cancel finalizeShuffleMerge if the stage is 
cancelled
-            // TODO: during shuffleMergeFinalizeWaitSec
-            shuffleClient.finalizeShuffleMerge(shuffleServiceLoc.host,
-              shuffleServiceLoc.port, shuffleId, shuffleMergeId,
-              new MergeFinalizerListener {
-                override def onShuffleMergeSuccess(statuses: MergeStatuses): 
Unit = {
-                  assert(shuffleId == statuses.shuffleId)
-                  eventProcessLoop.post(RegisterMergeStatuses(stage, 
MergeStatus.
-                    convertMergeStatusesToMergeStatusArr(statuses, 
shuffleServiceLoc)))
-                  results(index).set(true)
-                }
+        shuffleMergeFinalizeScheduler.schedule(new Runnable {

Review Comment:
   Thoughts on pushing finalization send into the threadpool instead ?
   
   ```
           val scheduledFutures = 
stage.shuffleDep.getMergerLocs.zipWithIndex.map {
             case (shuffleServiceLoc, index) =>
               shuffleMergeFinalizeScheduler.schedule(new Runnable() {
                 override def run(): Unit = {
                       // existing code within the "case"
                  
                 }
               }, 0, TimeUnit.SECONDS));
          }.toList
   ```
   
   And if there is a `TimeoutException`, we cancel all the `scheduledFutures` 
(`scheduledFutures.map(_.cancel(true))`)
   
   We should also bump up the default number of threads in this threadpool.
   
   This will make sure that we wait utmost for `shuffleMergeResultsTimeoutSec` 
seconds for finalization to complete. In almost all cases, the 
`shuffleClient.finalizeShuffleMerge` will be really quick - and so the overhead 
is fairly low - but for the rare cases where it is not, we will only block that 
specific sending thread (while all other threads will send finalize message to 
other merger hosts), and we will always complete within the timeout (and 
release blocked threads)
   
   Thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to