wankunde commented on code in PR #37533:
URL: https://github.com/apache/spark/pull/37533#discussion_r955564176


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
       None
     }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
     ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
       shuffleMergeFinalizeNumThreads)
 
+  // The merge finalization task (per stage) will submit a asynchronous thread 
to send finalize
+  // RPC to the merger location and then get MergeStatus from the merger. This 
thread won't stop
+  // after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
+  private val shuffleSendFinalizeRPCContext =
+    ExecutionContext.fromExecutor(ThreadUtils.newDaemonFixedThreadPool(
+      shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc"))

Review Comment:
   Because I think the number of send RPC tasks and total elapsed time in 
`shuffleSendFinalizeRPCContext` should bigger than tasks in 
`shuffleMergeFinalizeScheduler`.  



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -282,13 +286,19 @@ private[spark] class DAGScheduler(
       None
     }
 
-  // Use multi-threaded scheduled executor. The merge finalization task could 
take some time,
-  // depending on the time to establish connections to mergers, and the time 
to get MergeStatuses
-  // from all the mergers.
+  // Use multi-threaded scheduled executor. The merge finalization task (per 
stage) takes up to
+  // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
   private val shuffleMergeFinalizeScheduler =
     ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer",
       shuffleMergeFinalizeNumThreads)
 
+  // The merge finalization task (per stage) will submit a asynchronous thread 
to send finalize
+  // RPC to the merger location and then get MergeStatus from the merger. This 
thread won't stop
+  // after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT.
+  private val shuffleSendFinalizeRPCContext =
+    ExecutionContext.fromExecutor(ThreadUtils.newDaemonFixedThreadPool(
+      shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc"))

Review Comment:
   Because I think the number of send RPC tasks and total elapsed time in 
`shuffleSendFinalizeRPCContext` should bigger than tasks in 
`shuffleMergeFinalizeScheduler`.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to