[ 
https://issues.apache.org/jira/browse/SPARK-27736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16960244#comment-16960244
 ] 

feiwang edited comment on SPARK-27736 at 10/26/19 2:30 AM:
-----------------------------------------------------------

Hi, we met this issue recently.
[~joshrosen] [~tgraves]
How about implementing a simple solution:
* Let externalShuffleClient can query whether a executor is registered in ESS
* when FetchFailedException thrown, check whether this executor is registered 
in ESS
* if not, we should remove all outputs of executors that are not registered on 
this host.

If it is Ok, I can implement it.


was (Author: hzfeiwang):
Hi, we met this issue recently.
[~joshrosen] [~tgraves]
How about implementing a simple solution:
* Let externalShuffleClient can query whether a executor is registered in ESS
* when remove executor, check whether this executor is registered in ESS
* if not, we should remove all outputs of executors that are not registered on 
this host.

If it is Ok, I can implement it.

> Improve handling of FetchFailures caused by ExternalShuffleService losing 
> track of executor registrations
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27736
>                 URL: https://issues.apache.org/jira/browse/SPARK-27736
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 2.4.0
>            Reporter: Josh Rosen
>            Priority: Minor
>
> This ticket describes a fault-tolerance edge-case which can cause Spark jobs 
> to fail if a single external shuffle service process reboots and fails to 
> recover the list of registered executors (something which can happen when 
> using YARN if NodeManager recovery is disabled) _and_ the Spark job has a 
> large number of executors per host.
> I believe this problem can be worked around today via a change of 
> configurations, but I'm filing this issue to (a) better document this 
> problem, and (b) propose either a change of default configurations or 
> additional DAGScheduler logic to better handle this failure mode.
> h2. Problem description
> The external shuffle service process is _mostly_ stateless except for a map 
> tracking the set of registered applications and executors.
> When processing a shuffle fetch request, the shuffle services first checks 
> whether the requested block ID's executor is registered; if it's not 
> registered then the shuffle service throws an exception like 
> {code:java}
> java.lang.RuntimeException: Executor is not registered 
> (appId=application_1557557221330_6891, execId=428){code}
> and this exception becomes a {{FetchFailed}} error in the executor requesting 
> the shuffle block.
> In normal operation this error should not occur because executors shouldn't 
> be mis-routing shuffle fetch requests. However, this _can_ happen if the 
> shuffle service crashes and restarts, causing it to lose its in-memory 
> executor registration state. With YARN this state can be recovered from disk 
> if YARN NodeManager recovery is enabled (using the mechanism added in 
> SPARK-9439), but I don't believe that we perform state recovery in Standalone 
> and Mesos modes (see SPARK-24223).
> If state cannot be recovered then map outputs cannot be served (even though 
> the files probably still exist on disk). In theory, this shouldn't cause 
> Spark jobs to fail because we can always redundantly recompute lost / 
> unfetchable map outputs.
> However, in practice this can cause total job failures in deployments where 
> the node with the failed shuffle service was running a large number of 
> executors: by default, the DAGScheduler unregisters map outputs _only from 
> individual executor whose shuffle blocks could not be fetched_ (see 
> [code|https://github.com/apache/spark/blame/bfb3ffe9b33a403a1f3b6f5407d34a477ce62c85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1643]),
>  so it can take several rounds of failed stage attempts to fail and clear 
> output from all executors on the faulty host. If the number of executors on a 
> host is greater than the stage retry limit then this can exhaust stage retry 
> attempts and cause job failures.
> This "multiple rounds of recomputation to discover all failed executors on a 
> host" problem was addressed by SPARK-19753, which added a 
> {{spark.files.fetchFailure.unRegisterOutputOnHost}} configuration which 
> promotes executor fetch failures into host-wide fetch failures (clearing 
> output from all neighboring executors upon a single failure). However, that 
> configuration is {{false}} by default.
> h2. Potential solutions
> I have a few ideas about how we can improve this situation:
>  - Update the [YARN external shuffle service 
> documentation|https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service]
>  to recommend enabling node manager recovery.
>  - Consider defaulting {{spark.files.fetchFailure.unRegisterOutputOnHost}} to 
> {{true}}. This would improve out-of-the-box resiliency for large clusters. 
> The trade-off here is a reduction of efficiency in case there are transient 
> "false positive" fetch failures, but I suspect this case may be unlikely in 
> practice (so the change of default could be an acceptable trade-off). See 
> [prior discussion on 
> GitHub|https://github.com/apache/spark/pull/18150#discussion_r119736751].
>  - Modify DAGScheduler to add special-case handling for "Executor is not 
> registered" exceptions that trigger FetchFailures: if we see this exception 
> then it implies that the shuffle service failed to recover state, implying 
> that all of its prior outputs are effectively unavailable. In this case, it 
> _might_ be safe to unregister all host outputs irrespective of whether the 
> {{unRegisterOutputOnHost}} flag is set.
>  -- This might require us to string-match on exceptions (so we can be 
> backwards-compatible with old shuffle services, freeing users from needing to 
> upgrade / restart NMs to pick up this fix).
>  -- I suppose there's the potential for race conditions where the shuffle 
> service restarts and produces _new_ map outputs from freshly-registered 
> executors, only for us to turn around and unnecessarily clear those outputs 
> as part of the cleanup of the pre-shuffle-service-restart outputs. If we 
> assume shuffle service and executor deaths are coupled (i.e. that death of 
> the shuffle service process implies death of all executors, something which I 
> believe is true of both YARN NM and Standalone Worker death) then we could be 
> a bit more precise and invalidate outputs from all _dead_ executors on that 
> host.
> I'm open to other suggestions, too.
> /cc [~imranr] [~sitalke...@gmail.com] [~tgraves] [~vanzin] as FYI (since I 
> think you've all worked on relevant parts of this code).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to