[ 
https://issues.apache.org/jira/browse/SPARK-20832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-20832:
-------------------------------
    Description: 
In SPARK-17370 (a patch authored by [~ekhliang] and reviewed by me), we added 
logic to the DAGScheduler to mark external shuffle service instances as 
unavailable upon task failure when the task failure reason was "SlaveLost" and 
this was known to be caused by worker death. If the Spark Master discovered 
that a worker was dead then it would notify any drivers with executors on those 
workers to mark those executors as dead. The linked patch simply piggybacked on 
this logic to have the executor death notification also imply worker death and 
to have worker-death-caused-executor-death imply shuffle file loss.

However, there are modes of external shuffle service loss which this mechanism 
does not detect, leaving the system prone race conditions. Consider the 
following:

* Spark standalone is configured to run an external shuffle service embedded in 
the Worker.
* Application has shuffle outputs and executors on Worker A.
* Stage depending on outputs of tasks that ran on Worker A starts.
* All executors on worker A are removed due to dying with exceptions, 
scaling-down via the dynamic allocation APIs, but _not_ due to worker death. 
Worker A is still healthy at this point.
* At this point the MapOutputTracker still records map output locations on 
Worker A's shuffle service. This is expected behavior. 
* Worker A dies at an instant where the application has no executors running on 
it.
* The Master knows that Worker A died but does not inform the driver (which had 
no executors on that worker at the time of its death).
* Some task from the running stage attempts to fetch map outputs from Worker A 
but these requests time out because Worker A's shuffle service isn't available.
* Due to other logic in the scheduler, these preventable FetchFailures don't 
wind up invaliding the now-invalid unavailable map output locations (this is a 
distinct bug / behavior which I'll discuss in a separate JIRA ticket).
* This behavior leads to several unsuccessful stage reattempts and ultimately 
to a job failure.

A simple way to address this would be to have the Master explicitly notify 
drivers of all Worker deaths, even if those drivers don't currently have 
executors. The Spark Standalone scheduler backend can receive the explicit 
WorkerLost message and can bubble up the right calls to the task scheduler and 
DAGScheduler to invalidate map output locations from the now-dead external 
shuffle service.

This relates to SPARK-20115 in the sense that both tickets aim to address 
issues where the external shuffle service is unavailable. The key difference is 
the mechanism for detection: SPARK-20115 marks the external shuffle service as 
unavailable whenever any fetch failure occurs from it, whereas the proposal 
here relies on more explicit signals. This JIRA ticket's proposal is scoped 
only to Spark Standalone mode. As a compromise, we might be able to consider 
"all of a single shuffle's outputs lost on a single external shuffle service" 
following a fetch failure (to be discussed in separate JIRA). 

  was:
In SPARK-17370 (a patch authored by [~ekhliang] and reviewed by me), we added 
logic to the DAGScheduler to mark external shuffle service instances as 
unavailable upon task failure when the task failure reason was "SlaveLost" and 
this was known to be caused by worker death. If the Spark Master discovered 
that a worker was dead then it would notify any drivers with executors on those 
workers to mark those executors as dead. The linked patch simply piggybacked on 
this logic to have the executor death notification also imply worker death and 
to have worker-death-caused-executor-death imply shuffle file loss.

However, there are modes of external shuffle service loss which this mechanism 
does not detect, leaving the system prone race conditions. Consider the 
following:

* Spark standalone is configured to run an external shuffle service embedded in 
the Worker.
* Application has shuffle outputs and executors on Worker A.
* Stage depending on outputs of tasks that ran on Worker A starts.
* All executors on worker A are removed due to dying with exceptions, 
scaling-down via the dynamic allocation APIs, but _not_ due to worker death. 
Worker A is still healthy at this point.
* At this point the MapOutputTracker still records map output locations on 
Worker A's shuffle service. This is expected behavior. 
* Worker A dies at an instant where the application has no executors running on 
it.
* The Master knows that Worker A died but does not inform the driver (which had 
no executors on that worker at the time of its death).
* Some task from the running stage attempts to fetch map outputs from Worker A 
but these requests time out because Worker A's shuffle service isn't available.
* Due to other logic in the scheduler, these preventable FetchFailures don't 
wind up invaliding the now-invalid unavailable map output locations (this is a 
distinct bug / behavior which I'll discuss in a separate JIRA ticket).
* This behavior leads to several unsuccessful stage reattempts and ultimately 
to a job failure.

A simple way to address this would be to have the Master explicitly notify 
drivers of all Worker deaths, even if those drivers don't currently have 
executors. The Spark Standalone scheduler backend can receive the explicit 
WorkerLost message and can bubble up the right calls to the task scheduler and 
DAGScheduler to invalidate map output locations from the now-dead external 
shuffle service.


> Standalone master should explicitly inform drivers of worker deaths and 
> invalidate external shuffle service outputs
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20832
>                 URL: https://issues.apache.org/jira/browse/SPARK-20832
>             Project: Spark
>          Issue Type: Bug
>          Components: Deploy, Scheduler
>    Affects Versions: 2.0.0
>            Reporter: Josh Rosen
>
> In SPARK-17370 (a patch authored by [~ekhliang] and reviewed by me), we added 
> logic to the DAGScheduler to mark external shuffle service instances as 
> unavailable upon task failure when the task failure reason was "SlaveLost" 
> and this was known to be caused by worker death. If the Spark Master 
> discovered that a worker was dead then it would notify any drivers with 
> executors on those workers to mark those executors as dead. The linked patch 
> simply piggybacked on this logic to have the executor death notification also 
> imply worker death and to have worker-death-caused-executor-death imply 
> shuffle file loss.
> However, there are modes of external shuffle service loss which this 
> mechanism does not detect, leaving the system prone race conditions. Consider 
> the following:
> * Spark standalone is configured to run an external shuffle service embedded 
> in the Worker.
> * Application has shuffle outputs and executors on Worker A.
> * Stage depending on outputs of tasks that ran on Worker A starts.
> * All executors on worker A are removed due to dying with exceptions, 
> scaling-down via the dynamic allocation APIs, but _not_ due to worker death. 
> Worker A is still healthy at this point.
> * At this point the MapOutputTracker still records map output locations on 
> Worker A's shuffle service. This is expected behavior. 
> * Worker A dies at an instant where the application has no executors running 
> on it.
> * The Master knows that Worker A died but does not inform the driver (which 
> had no executors on that worker at the time of its death).
> * Some task from the running stage attempts to fetch map outputs from Worker 
> A but these requests time out because Worker A's shuffle service isn't 
> available.
> * Due to other logic in the scheduler, these preventable FetchFailures don't 
> wind up invaliding the now-invalid unavailable map output locations (this is 
> a distinct bug / behavior which I'll discuss in a separate JIRA ticket).
> * This behavior leads to several unsuccessful stage reattempts and ultimately 
> to a job failure.
> A simple way to address this would be to have the Master explicitly notify 
> drivers of all Worker deaths, even if those drivers don't currently have 
> executors. The Spark Standalone scheduler backend can receive the explicit 
> WorkerLost message and can bubble up the right calls to the task scheduler 
> and DAGScheduler to invalidate map output locations from the now-dead 
> external shuffle service.
> This relates to SPARK-20115 in the sense that both tickets aim to address 
> issues where the external shuffle service is unavailable. The key difference 
> is the mechanism for detection: SPARK-20115 marks the external shuffle 
> service as unavailable whenever any fetch failure occurs from it, whereas the 
> proposal here relies on more explicit signals. This JIRA ticket's proposal is 
> scoped only to Spark Standalone mode. As a compromise, we might be able to 
> consider "all of a single shuffle's outputs lost on a single external shuffle 
> service" following a fetch failure (to be discussed in separate JIRA). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to