[ https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039884#comment-17039884 ]
Saurabh Chawla edited comment on SPARK-30873 at 2/20/20 3:43 AM: ----------------------------------------------------------------- We have raised the WIP PR for this. cc [~holden] [~itskals][~amargoor] was (Author: saurabhc100): We have raised the WIP PR for this. cc [~holdenkarau] [~itskals][~amargoor] > Handling Node Decommissioning for Yarn cluster manger in Spark > -------------------------------------------------------------- > > Key: SPARK-30873 > URL: https://issues.apache.org/jira/browse/SPARK-30873 > Project: Spark > Issue Type: Improvement > Components: Spark Core, YARN > Affects Versions: 3.0.0 > Reporter: Saurabh Chawla > Priority: Major > > In many public cloud environments, the node loss (in case of AWS > SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed > activity. > The cloud provider intimates the cluster manager about the possible loss of > node ahead of time. Few examples is listed here: > a) Spot loss in AWS(2 min before event) > b) GCP Pre-emptible VM loss (30 second before event) > c) AWS Spot block loss with info on termination time (generally few tens of > minutes before decommission as configured in Yarn) > This JIRA tries to make spark leverage the knowledge of the node loss in > future, and tries to adjust the scheduling of tasks to minimise the impact on > the application. > It is well known that when a host is lost, the executors, its running tasks, > their caches and also Shuffle data is lost. This could result in wastage of > compute and other resources. > The focus here is to build a framework for YARN, that can be extended for > other cluster managers to handle such scenario. > The framework must handle one or more of the following:- > 1) Prevent new tasks from starting on any executors on decommissioning Nodes. > 2) Decide to kill the running tasks so that they can be restarted elsewhere > (assuming they will not complete within the deadline) OR we can allow them to > continue hoping they will finish within deadline. > 3) Clear the shuffle data entry from MapOutputTracker of decommission node > hostname to prevent the shuffle fetchfailed exception.The most significant > advantage of unregistering shuffle outputs when Spark schedules the first > re-attempt to compute the missing blocks, it notices all of the missing > blocks from decommissioned nodes and recovers in only one attempt. This > speeds up the recovery process significantly over the scheduled Spark > implementation, where stages might be rescheduled multiple times to recompute > missing shuffles from all nodes, and prevent jobs from being stuck for hours > failing and recomputing. > 4) Prevent the stage to abort due to the fetchfailed exception in case of > decommissioning of node. In Spark there is number of consecutive stage > attempts allowed before a stage is aborted.This is controlled by the config > spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due > decommissioning of nodes towards stage failure improves the reliability of > the system. > Main components of change > 1) Get the ClusterInfo update from the Resource Manager -> Application Master > -> Spark Driver. > 2) DecommissionTracker, resides inside driver, tracks all the decommissioned > nodes and take necessary action and state transition. > 3) Based on the decommission node list add hooks at code to achieve > a) No new task on executor > b) Remove shuffle data mapping info for the node to be decommissioned from > the mapOutputTracker > c) Do not count fetchFailure from decommissioned towards stage failure > On the receiving info that node is to be decommissioned, the below action > needs to be performed by DecommissionTracker on driver: > * Add the entry of Nodes in DecommissionTracker with termination time and > node state as "DECOMMISSIONING". > * Stop assigning any new tasks on executors on the nodes which are candidate > for decommission. This makes sure slowly as the tasks finish the usage of > this node would die down. > * Kill all the executors for the decommissioning nodes after configurable > period of time, say "spark.graceful.decommission.executor.leasetimePct". This > killing ensures two things. Firstly, the task failure will be attributed in > job failure count. Second, avoid generation on more shuffle data on the node > that will eventually be lost. The node state is set to > "EXECUTOR_DECOMMISSIONED". > * Mark Shuffle data on the node as unavailable after > "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will > ensure that recomputation of missing shuffle partition is done early, rather > than reducers failing with a time-consuming FetchFailure. The node state is > set to "SHUFFLE_DECOMMISSIONED". > * Mark Node as Terminated after the termination time. Now the state of the > node is "TERMINATED". > * Remove the node entry from Decommission Tracker if the same host name is > reused.(This is not uncommon in many public cloud environments). -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org