Josh Rosen created SPARK-27736:
----------------------------------

             Summary: Improve handling of FetchFailures caused by 
ExternalShuffleService losing track of executor registrations
                 Key: SPARK-27736
                 URL: https://issues.apache.org/jira/browse/SPARK-27736
             Project: Spark
          Issue Type: Bug
          Components: Shuffle
    Affects Versions: 2.4.0
            Reporter: Josh Rosen


I have discovered a fault-tolerance edge-case which can cause Spark jobs to 
fail if a single external shuffle service process reboots and fails to recover 
the list of registered executors (something which can happen when using YARN if 
NodeManager recovery is disabled).

I believe this problem can be worked around today via a change of 
configurations, but I'm filing this issue to (a) better document this problem, 
and (b) propose either a change of default configurations or additional 
DAGScheduler logic to better handle this failure mode.
h2. Problem description

The external shuffle service process is _mostly_ stateless except for a map 
tracking the set of registered applications and executors.

When processing a shuffle fetch request, the shuffle services first checks 
whether the requested block ID's executor is registered; if it's not registered 
then the shuffle service throws an exception like 
{code:java}
java.lang.RuntimeException: Executor is not registered 
(appId=application_1557557221330_6891, execId=428){code}
and this exception becomes a {{FetchFailed}} errors in executor requesting the 
shuffle block.

In normal operation this error should not occur because executors shouldn't be 
mis-routing shuffle fetch requests. However, this _can_ happen if the shuffle 
service crashes and restarts, causing it to lose its in-memory executor 
registration state. With YARN this state can be recovered from disk if YARN 
NodeManager recovery is enabled (using the mechanism added in SPARK-9439), but 
I don't believe that we perform state recovery in Standalone and Mesos modes 
(see SPARK-24223).

If state cannot be recovered then map outputs cannot be served (even though the 
files probably still exist on disk). In theory, this shouldn't cause Spark jobs 
to fail because we can always redundantly recompute lost / unfetchable map 
outputs.

However, in practice this can cause total job failures in deployments where the 
node with the failed shuffle service was running a large number of executors: 
by default, the DAGScheduler unregisters map outputs _only from individual 
executor whose shuffle blocks could not be fetched_ (see 
[code|https://github.com/apache/spark/blame/bfb3ffe9b33a403a1f3b6f5407d34a477ce62c85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1643]),
 so it can take several rounds of failed stage attempts to fail and clear 
output from all executors on the faulty host. If the number of executors on a 
host is greater than the stage retry limit then this can exhaust stage retry 
attempts and cause job failures.

This "multiple rounds of recomputation to discover all failed executors on a 
host" problem was addressed by SPARK-19753, which added a 
{{spark.files.fetchFailure.unRegisterOutputOnHost}} configuration which 
promotes executor fetch failures into host-wide fetch failures (clearing output 
from all neighboring executors upon a single failure). However, that 
configuration is {{false}} by default.
h2. Potential solutions

I have a few ideas about how we can improve this situation:
 - Update the [YARN external shuffle service 
documentation|https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service]
 to recommend enabling node manager recovery.
 - Consider defaulting {{spark.files.fetchFailure.unRegisterOutputOnHost}} to 
{{true}}. This would improve out-of-the-box resiliency for large clusters. The 
trade-off here is a reduction of efficiency in case there are transient "false 
positive" fetch failures, but I suspect this case may be unlikely in practice 
(so the change of default could be an acceptable trade-off). See [prior 
discussion on 
GitHub|https://github.com/apache/spark/pull/18150#discussion_r119736751].
 - Modify DAGScheduler to add special-case handling for "Executor is not 
registered" exceptions that trigger FetchFailures: if we see this exception 
then it implies that the shuffle service failed to recover state, implying that 
all of its prior outputs are effectively unavailable. In this case, it _might_ 
be safe to unregister all host outputs irrespective of whether the 
{{unRegisterOutputOnHost}} flag is set.
 -- This might require us to string-match on exceptions (so we can be 
backwards-compatible with old shuffle services, freeing users from needing to 
upgrade / restart NMs to pick up this fix).
 -- I suppose there's the potential for race conditions where the shuffle 
service restarts and produces _new_ map outputs from freshly-registered 
executors, only for us to turn around and unnecessarily clear those outputs as 
part of the cleanup of the pre-shuffle-service-restart outputs. If we assume 
shuffle service and executor deaths are coupled (i.e. that death of the shuffle 
service process implies death of all executors, something which I believe is 
true of both YARN NM and Standalone Worker death) then we could be a bit more 
precise and invalidate outputs from all _dead_ executors on that host.

I'm open to other suggestions, too.

/cc [~squito] [~sitalke...@gmail.com] [~tgraves] [~vanzin] as FYI (since I 
think you've all worked on relevant parts of this code).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to