[ https://issues.apache.org/jira/browse/SPARK-27736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841593#comment-16841593 ]
Thomas Graves commented on SPARK-27736: --------------------------------------- Yeah we always ran yarn with node manager recover on, but that doesn't help standalone mode unless you implement something similar. But either way I think documenting it on yarn is a good idea. We used to see transient fetch failures all the time, because of temporary spikes in disk usage, so I would be hesitant to turn on spark.files.fetchFailure.unRegisterOutputOnHost by default, but on the other hand users could turn it back off too, so it depends on what people think is most common. I don't think you can assume the death of shuffle service (NM on yarn) implies death of executor. We have seen Nodemanagers goes down with OOM and executor stays up. Without the NM there, there isn't really anything to clean up the containers on it. Now you will obviously fetch fail from that node if it does go down. Your last option seems like the best of those but like you mention could get a bit ugly with the String matching. The other thing you can do is start tracking those fetch failures and have the driver make a more informed decision on that. This is work we had started to do at my previous employer but never had time to finish it. Its a much bigger change but really what we should be doing. It would allow us to make better decisions about black listing and see was it the map or reduce node that has issues, etc. > Improve handling of FetchFailures caused by ExternalShuffleService losing > track of executor registrations > --------------------------------------------------------------------------------------------------------- > > Key: SPARK-27736 > URL: https://issues.apache.org/jira/browse/SPARK-27736 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 2.4.0 > Reporter: Josh Rosen > Priority: Minor > > This ticket describes a fault-tolerance edge-case which can cause Spark jobs > to fail if a single external shuffle service process reboots and fails to > recover the list of registered executors (something which can happen when > using YARN if NodeManager recovery is disabled) _and_ the Spark job has a > large number of executors per host. > I believe this problem can be worked around today via a change of > configurations, but I'm filing this issue to (a) better document this > problem, and (b) propose either a change of default configurations or > additional DAGScheduler logic to better handle this failure mode. > h2. Problem description > The external shuffle service process is _mostly_ stateless except for a map > tracking the set of registered applications and executors. > When processing a shuffle fetch request, the shuffle services first checks > whether the requested block ID's executor is registered; if it's not > registered then the shuffle service throws an exception like > {code:java} > java.lang.RuntimeException: Executor is not registered > (appId=application_1557557221330_6891, execId=428){code} > and this exception becomes a {{FetchFailed}} error in the executor requesting > the shuffle block. > In normal operation this error should not occur because executors shouldn't > be mis-routing shuffle fetch requests. However, this _can_ happen if the > shuffle service crashes and restarts, causing it to lose its in-memory > executor registration state. With YARN this state can be recovered from disk > if YARN NodeManager recovery is enabled (using the mechanism added in > SPARK-9439), but I don't believe that we perform state recovery in Standalone > and Mesos modes (see SPARK-24223). > If state cannot be recovered then map outputs cannot be served (even though > the files probably still exist on disk). In theory, this shouldn't cause > Spark jobs to fail because we can always redundantly recompute lost / > unfetchable map outputs. > However, in practice this can cause total job failures in deployments where > the node with the failed shuffle service was running a large number of > executors: by default, the DAGScheduler unregisters map outputs _only from > individual executor whose shuffle blocks could not be fetched_ (see > [code|https://github.com/apache/spark/blame/bfb3ffe9b33a403a1f3b6f5407d34a477ce62c85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1643]), > so it can take several rounds of failed stage attempts to fail and clear > output from all executors on the faulty host. If the number of executors on a > host is greater than the stage retry limit then this can exhaust stage retry > attempts and cause job failures. > This "multiple rounds of recomputation to discover all failed executors on a > host" problem was addressed by SPARK-19753, which added a > {{spark.files.fetchFailure.unRegisterOutputOnHost}} configuration which > promotes executor fetch failures into host-wide fetch failures (clearing > output from all neighboring executors upon a single failure). However, that > configuration is {{false}} by default. > h2. Potential solutions > I have a few ideas about how we can improve this situation: > - Update the [YARN external shuffle service > documentation|https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service] > to recommend enabling node manager recovery. > - Consider defaulting {{spark.files.fetchFailure.unRegisterOutputOnHost}} to > {{true}}. This would improve out-of-the-box resiliency for large clusters. > The trade-off here is a reduction of efficiency in case there are transient > "false positive" fetch failures, but I suspect this case may be unlikely in > practice (so the change of default could be an acceptable trade-off). See > [prior discussion on > GitHub|https://github.com/apache/spark/pull/18150#discussion_r119736751]. > - Modify DAGScheduler to add special-case handling for "Executor is not > registered" exceptions that trigger FetchFailures: if we see this exception > then it implies that the shuffle service failed to recover state, implying > that all of its prior outputs are effectively unavailable. In this case, it > _might_ be safe to unregister all host outputs irrespective of whether the > {{unRegisterOutputOnHost}} flag is set. > -- This might require us to string-match on exceptions (so we can be > backwards-compatible with old shuffle services, freeing users from needing to > upgrade / restart NMs to pick up this fix). > -- I suppose there's the potential for race conditions where the shuffle > service restarts and produces _new_ map outputs from freshly-registered > executors, only for us to turn around and unnecessarily clear those outputs > as part of the cleanup of the pre-shuffle-service-restart outputs. If we > assume shuffle service and executor deaths are coupled (i.e. that death of > the shuffle service process implies death of all executors, something which I > believe is true of both YARN NM and Standalone Worker death) then we could be > a bit more precise and invalidate outputs from all _dead_ executors on that > host. > I'm open to other suggestions, too. > /cc [~imranr] [~sitalke...@gmail.com] [~tgraves] [~vanzin] as FYI (since I > think you've all worked on relevant parts of this code). -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org