[ 
https://issues.apache.org/jira/browse/SPARK-49472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878467#comment-17878467
 ] 

Yuming Wang commented on SPARK-49472:
-------------------------------------

Thanks [~Ngone51]. Our workflow is:
 # Decommission executor
 # Wait 2 hours, to make downstream consume shuffle data.
 # Stop NodeManager and apply the patch(JDK related patch or OS related patch), 
then restart this node. This step takes 0.5 hours.

We do not clean up the shuffle data in the whole process. We expected to use 
the previously shuffled data as much as possible. 

> Resubmit the task on executor decommission
> ------------------------------------------
>
>                 Key: SPARK-49472
>                 URL: https://issues.apache.org/jira/browse/SPARK-49472
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.0
>            Reporter: Yuming Wang
>            Priority: Major
>         Attachments: stage.png, task.png
>
>
> The task re-run many times and can't complete for a long running stage if
> both {{spark.decommission.enabled}} and {{spark.shuffle.service.enabled}} are 
> enabled.
> !stage.png|height=70,width=600!
> !task.png|height=400,width=400!
> Below is the additional log we added:
> {noformat}
> ---
>  .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++++--
>  1 file changed, 6 insertions(+), 2 deletions(-)
> diff --git 
> a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
> b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
> index a1a54daf5f8..5846827c832 100644
> --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
> +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
> @@ -1117,6 +1117,7 @@ private[spark] class TaskSetManager(
>  
>    /** Called by TaskScheduler when an executor is lost so we can re-enqueue 
> our tasks */
>    override def executorLost(execId: String, host: String, reason: 
> ExecutorLossReason): Unit = {
> +    logInfo(s"Executor lost: execId: $execId, host: $host, reason: $reason.")
>      // Re-enqueue any tasks with potential shuffle data loss that ran on the 
> failed executor
>      // if this is a shuffle map stage, and we are not using an external 
> shuffle server which
>      // could serve the shuffle outputs or the executor lost is caused by 
> decommission (which
> @@ -1148,8 +1149,11 @@ private[spark] class TaskSetManager(
>              //    This shouldn't not happen ideally since TaskSetManager 
> handles executor lost first
>              //    before DAGScheduler. So the map statues for the successful 
> task must be available
>              //    at this moment. keep it here in case the handling order 
> changes.
> -            locationOpt.exists(_.host != host)
> -
> +            val isShuffleMapOutputLoss = locationOpt.exists(_.host != host)
> +            logInfo(s"Is shuffle map output available: partition id: 
> ${info.partitionId}, " +
> +              s"tid: ${tid}, locationOpt: ${locationOpt}, " +
> +              s"isShuffleMapOutputLoss: ${isShuffleMapOutputLoss}.")
> +            isShuffleMapOutputLoss
>            case _ => false
>          }
>          // We may have a running task whose partition has been marked as 
> successful,
> {noformat}
> Output:
> {noformat}
> 24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Decommission executors: 
> 1608
> 24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Notify executor 1608 to 
> decommission.
> 24/08/26 16:56:22 INFO BlockManagerMasterEndpoint: Mark BlockManagers 
> (BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 30502, None)) 
> as being decommissioning.
> 24/08/26 16:56:22 INFO ExecutorAllocationManager: Executors 1608 removed due 
> to idle timeout.
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 4992.1 in stage 7.0 (TID 
> 16851) in 807662 ms on hdc42-mcc10-01-0710-4001-001.company.com (executor 
> 1300) (5000/6000)
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 1335.2 in stage 7.0 (TID 
> 16713) in 903010 ms on hdc42-mcc10-01-0110-7303-009.company.com (executor 
> 1141) (5001/6000)
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 2290.1 in stage 7.0 (TID 
> 16573) in 1115189 ms on hdc42-mcc10-01-1110-3305-038.company (executor 568) 
> (5002/6000)
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 349.1 in stage 7.0 (TID 
> 16916) in 777120 ms on hdc42-mcc10-01-0110-5803-003.company.com (executor 
> 1345) (5003/6000)
> 24/08/26 16:56:23 INFO YarnAllocator: Driver requested a total number of 499 
> executor(s) for resource profile id: 0.
> 24/08/26 16:56:24 INFO YarnClusterScheduler: Executor 1608 on 
> hdc42-mcc10-01-0110-7302-007.company.com is decommissioned after 1.3 s.
> 24/08/26 16:56:24 INFO TaskSetManager: Executor lost: execId: 1608, host: 
> hdc42-mcc10-01-0110-7302-007.company.com, reason: Executor decommission: 
> spark scale down.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
> partition id: 3749, tid: 10302, locationOpt: Some(BlockManagerId(1608, 
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), 
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3749), so 
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
> partition id: 763, tid: 16636, locationOpt: Some(BlockManagerId(1608, 
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), 
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 763), so 
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
> partition id: 2971, tid: 15433, locationOpt: Some(BlockManagerId(1608, 
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), 
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 2971), so 
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
> partition id: 5835, tid: 16587, locationOpt: Some(BlockManagerId(1608, 
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), 
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 5835), so 
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
> partition id: 611, tid: 15118, locationOpt: Some(BlockManagerId(1608, 
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), 
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 611), so 
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
> partition id: 3750, tid: 10303, locationOpt: Some(BlockManagerId(1608, 
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), 
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3750), so 
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available: 
> partition id: 3740, tid: 13610, locationOpt: Some(BlockManagerId(1608, 
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), 
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3740), so 
> marking it as still running.
> 24/08/26 16:56:24 INFO DAGScheduler: Executor lost: 1608 (epoch 1)
> 24/08/26 16:56:24 INFO BlockManagerMasterEndpoint: Trying to remove executor 
> 1608 from BlockManagerMaster.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to