[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401514#comment-17401514
 ] 

Saurabh Chawla edited comment on SPARK-30873 at 8/19/21, 7:46 AM:
------------------------------------------------------------------

[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe it's better to get DecommissioningTimeout from RM rather than doing 
it here in Spark side through some confs. To get the correct behaviour of 
graceful decommissioning working in spark, It's better to use  hadoop-3.0.1 or 
later version of hadoop.


was (Author: saurabhc100):
[~kings129] - The backport of the  
(https://issues.apache.org/jira/browse/YARN-6483. and 
https://issues.apache.org/jira/browse/YARN-4676. ) was done by our hadoop team. 
This change is needed to get the support the framework 
(getDecommissioningTimeout, setDecommissioningTimeout) that was added in the 
NodeReport.java . And in this Spark PR we have made use of NodeReport methods 
to get the DecommissioningTimeout in YarnAllocator.scala. This change is 
available from hadoop-3.0.1

Although this DECOMMISSIONING node state support is there from hadoop-2.8 and 
hadoop-2.9, and if we received the Decommissioning State info from RM, we can 
make use of some conf related to decommission timeout for various cloud 
provider, For example -> like Aws spot node is terminated after 120 sec after 
the decommissioning state so we can give 110 sec as decommissioning timeout. 
Same is for GCP preemptible VMs.

I believe its good to get DecommissioningTimeout from RM rather than doing it 
here in Spark side through some confs. To get the correct behaviour of graceful 
decommissioning working in spark. It's better to use  hadoop-3.0.1 or later 
version of hadoop.

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --------------------------------------------------------------
>
>                 Key: SPARK-30873
>                 URL: https://issues.apache.org/jira/browse/SPARK-30873
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, YARN
>    Affects Versions: 3.1.0
>            Reporter: Saurabh Chawla
>            Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled multiple times to recompute 
> missing shuffles from all nodes, and prevent jobs from being stuck for hours 
> failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of 
> decommissioning of node. In Spark there is number of consecutive stage 
> attempts allowed before a stage is aborted.This is controlled by the config 
> spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due 
> decommissioning of nodes towards stage failure improves the reliability of 
> the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master 
> -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned 
> nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
>  a) No new task on executor
>  b) Remove shuffle data mapping info for the node to be decommissioned from 
> the mapOutputTracker
>  c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action 
> needs to be performed by DecommissionTracker on driver:
>  * Add the entry of Nodes in DecommissionTracker with termination time and 
> node state as "DECOMMISSIONING".
>  * Stop assigning any new tasks on executors on the nodes which are candidate 
> for decommission. This makes sure slowly as the tasks finish the usage of 
> this node would die down.
>  * Kill all the executors for the decommissioning nodes after configurable 
> period of time, say "spark.graceful.decommission.executor.leasetimePct". This 
> killing ensures two things. Firstly, the task failure will be attributed in 
> job failure count. Second, avoid generation on more shuffle data on the node 
> that will eventually be lost. The node state is set to 
> "EXECUTOR_DECOMMISSIONED". 
>  * Mark Shuffle data on the node as unavailable after 
> "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will 
> ensure that recomputation of missing shuffle partition is done early, rather 
> than reducers failing with a time-consuming FetchFailure. The node state is 
> set to "SHUFFLE_DECOMMISSIONED". 
>  * Mark Node as Terminated after the termination time. Now the state of the 
> node is "TERMINATED".
>  * Remove the node entry from Decommission Tracker if the same host name is 
> reused.(This is not uncommon in many public cloud environments).
> This is the life cycle of the nodes which is decommissioned
> DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED -> 
> TERMINATED.
> *Why do we exit the executors decommission before the shuffle decommission 
> service? *- There are 2 reasons why we are exiting the executors before the 
> shuffle service
> a) As per the current logic whenever we received the node decommissioning we 
> stop assigning the new task to the executor running on that node. We give 
> some time to the task already running on that executor to complete before 
> killing the executors. If we keep the executors running till the end, there 
> are chances of generating more shuffle data which will be eventually lost, 
> triggering a recompute in future. This approach minimizes the recomputation 
> of the shuffle data and maximise the usage of that shuffle data on the node 
> by increasing the availability of it till the end.
> b) We want to keep the shuffle data till the time where the node is about to 
> be lost, So if there are some task that is dependent on that shuffle data can 
> complete and we don't have to recompute the shuffle data if none of the task 
> required the shuffle data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to