wankunde commented on code in PR #37533: URL: https://github.com/apache/spark/pull/37533#discussion_r955564176
########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -282,13 +286,19 @@ private[spark] class DAGScheduler( None } - // Use multi-threaded scheduled executor. The merge finalization task could take some time, - // depending on the time to establish connections to mergers, and the time to get MergeStatuses - // from all the mergers. + // Use multi-threaded scheduled executor. The merge finalization task (per stage) takes up to + // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. private val shuffleMergeFinalizeScheduler = ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer", shuffleMergeFinalizeNumThreads) + // The merge finalization task (per stage) will submit a asynchronous thread to send finalize + // RPC to the merger location and then get MergeStatus from the merger. This thread won't stop + // after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. + private val shuffleSendFinalizeRPCContext = + ExecutionContext.fromExecutor(ThreadUtils.newDaemonFixedThreadPool( + shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc")) Review Comment: Because I think the number of send RPC tasks and total elapsed time in `shuffleSendFinalizeRPCContext` should bigger than tasks in `shuffleMergeFinalizeScheduler`. ########## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ########## @@ -282,13 +286,19 @@ private[spark] class DAGScheduler( None } - // Use multi-threaded scheduled executor. The merge finalization task could take some time, - // depending on the time to establish connections to mergers, and the time to get MergeStatuses - // from all the mergers. + // Use multi-threaded scheduled executor. The merge finalization task (per stage) takes up to + // PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. private val shuffleMergeFinalizeScheduler = ThreadUtils.newDaemonThreadPoolScheduledExecutor("shuffle-merge-finalizer", shuffleMergeFinalizeNumThreads) + // The merge finalization task (per stage) will submit a asynchronous thread to send finalize + // RPC to the merger location and then get MergeStatus from the merger. This thread won't stop + // after PUSH_BASED_SHUFFLE_MERGE_RESULTS_TIMEOUT. + private val shuffleSendFinalizeRPCContext = + ExecutionContext.fromExecutor(ThreadUtils.newDaemonFixedThreadPool( + shuffleSendFinalizeRPCThreads, "send-shuffle-merge-finalize-rpc")) Review Comment: Because I think the number of send RPC tasks and total elapsed time in `shuffleSendFinalizeRPCContext` should bigger than tasks in `shuffleMergeFinalizeScheduler`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org