Saurabh Chawla created SPARK-30873:
--------------------------------------

             Summary: Handling Node Decommissioning for Yarn cluster manger in 
Spark
                 Key: SPARK-30873
                 URL: https://issues.apache.org/jira/browse/SPARK-30873
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, YARN
    Affects Versions: 3.0.0
            Reporter: Saurabh Chawla


In many public cloud environments, the node loss (in case of AWS SpotLoss,Spot 
blocks and GCP preemptible VMs) is a planned and informed activity. 
The cloud provider intimates the cluster manager about the possible loss of 
node ahead of time. Few exmaples is listed here:
a) Spot loss in AWS(2 min before event)
b) GCP Pre-emptible VM loss (30 second before event)
c) AWS Spot block loss with info on termination time (generally few tens of 
minutes before decommission as configured in Yarn)

This JIRA tries to make spark leverage the knowledge of the node loss in 
future, and tries to adjust the scheduling of tasks to minimise the impact on 
the application. 
It is well known that when a host is lost, the executors, its running tasks, 
their caches and also Shuffle data is lost. This could result in wastage of 
compute and other resources.

The focus here is to build a framework for YARN, that can be extended for other 
cluster managers to handle such scenario.

The framework must handle one or more of the following:-
1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
2) Decide to kill the running tasks so that they can be restarted elsewhere 
(assuming they will not complete within the deadline) OR we can allow them to 
continue hoping they will finish within deadline.
3) Clear the shuffle data entry from MapOutputTracker of decommission node 
hostname to prevent the shuffle fetchfailed exception.The most significant 
advantage of unregistering shuffle outputs when Spark schedules the first 
re-attempt to compute the missing blocks, it notices all of the missing blocks 
from decommissioned nodes and recovers in only one attempt. This speeds up the 
recovery process significantly over the scheduled Spark implementation, where 
stages might be rescheduled multiple times to recompute missing shuffles from 
all nodes, and prevent jobs from being stuck for hours failing and recomputing.
4) Prevent the stage to abort due to the fetchfailed execption in case of 
decomissioning of node. In Spark there is number of consecutive stage attempts 
allowed before a stage is aborted.This is controlled by the config 
spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due 
decommissioning of nodes towards stage failure improves the reliability of the 
system.

Main components of change
1) Get the ClusterInfo update from the Resource Manager -> Application Master 
-> Spark Driver.
2) DecommissionTracker, resides inside driver, tracks all the decommissioned 
nodes and take necessary action and state transition.
3) Based on the decommission node list add hooks at code to achieve
 a) No new task on executor
 b) Remove shuffle data mapping info for the node to be decommissioned from the 
mapOutputTracker
 c) Do not count fetchFailure from decommissioned towards stage failure

On the receiving info that node is to be decommissioned, the below action needs 
to be performed by DecommissionTracker on driver:
 * Add the entry of Nodes in DecommissionTracker with termination time and 
nodestate as "DECOMMISSIONING".
 * Stop assigning any new tasks on executors on the nodes which are candidate 
for decommission. This makes sure slowly as the tasks finish the usage of this 
node would die down.
 * Kill all the executors for the decommissioning nodes after configurable 
period of time, say "spark.graceful.decommission.executor.leasetimePct". This 
killing ensures two things. Firstly, the task failure will be attributed in job 
failure count. Second, avoid generation on more shuffle data on the node that 
will eventually be lost. The node state is set to "EXECUTOR_DECOMMISSIONED". 
 * Mark Shuffle data on the node as unavailable after 
"spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will 
ensure that recomputation of missing shuffle partition is done early, rather 
than reducers failing with a time-consuming FetchFailure. The node state is set 
to "SHUFFLE_DECOMMISSIONED". 
 * Mark Node as Terminated after the termination time. Now the state of the 
node is "TERMINATED".
 * Remove the node entry from Decommission Tracker if the same host name is 
reused.(This is not uncommon in many public cloud environments).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to