[ https://issues.apache.org/jira/browse/SPARK-27736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16960244#comment-16960244 ]
feiwang edited comment on SPARK-27736 at 10/26/19 2:30 AM: ----------------------------------------------------------- Hi, we met this issue recently. [~joshrosen] [~tgraves] How about implementing a simple solution: * Let externalShuffleClient can query whether a executor is registered in ESS * when FetchFailedException thrown, check whether this executor is registered in ESS * if not, we should remove all outputs of executors that are not registered on this host. If it is Ok, I can implement it. was (Author: hzfeiwang): Hi, we met this issue recently. [~joshrosen] [~tgraves] How about implementing a simple solution: * Let externalShuffleClient can query whether a executor is registered in ESS * when remove executor, check whether this executor is registered in ESS * if not, we should remove all outputs of executors that are not registered on this host. If it is Ok, I can implement it. > Improve handling of FetchFailures caused by ExternalShuffleService losing > track of executor registrations > --------------------------------------------------------------------------------------------------------- > > Key: SPARK-27736 > URL: https://issues.apache.org/jira/browse/SPARK-27736 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 2.4.0 > Reporter: Josh Rosen > Priority: Minor > > This ticket describes a fault-tolerance edge-case which can cause Spark jobs > to fail if a single external shuffle service process reboots and fails to > recover the list of registered executors (something which can happen when > using YARN if NodeManager recovery is disabled) _and_ the Spark job has a > large number of executors per host. > I believe this problem can be worked around today via a change of > configurations, but I'm filing this issue to (a) better document this > problem, and (b) propose either a change of default configurations or > additional DAGScheduler logic to better handle this failure mode. > h2. Problem description > The external shuffle service process is _mostly_ stateless except for a map > tracking the set of registered applications and executors. > When processing a shuffle fetch request, the shuffle services first checks > whether the requested block ID's executor is registered; if it's not > registered then the shuffle service throws an exception like > {code:java} > java.lang.RuntimeException: Executor is not registered > (appId=application_1557557221330_6891, execId=428){code} > and this exception becomes a {{FetchFailed}} error in the executor requesting > the shuffle block. > In normal operation this error should not occur because executors shouldn't > be mis-routing shuffle fetch requests. However, this _can_ happen if the > shuffle service crashes and restarts, causing it to lose its in-memory > executor registration state. With YARN this state can be recovered from disk > if YARN NodeManager recovery is enabled (using the mechanism added in > SPARK-9439), but I don't believe that we perform state recovery in Standalone > and Mesos modes (see SPARK-24223). > If state cannot be recovered then map outputs cannot be served (even though > the files probably still exist on disk). In theory, this shouldn't cause > Spark jobs to fail because we can always redundantly recompute lost / > unfetchable map outputs. > However, in practice this can cause total job failures in deployments where > the node with the failed shuffle service was running a large number of > executors: by default, the DAGScheduler unregisters map outputs _only from > individual executor whose shuffle blocks could not be fetched_ (see > [code|https://github.com/apache/spark/blame/bfb3ffe9b33a403a1f3b6f5407d34a477ce62c85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1643]), > so it can take several rounds of failed stage attempts to fail and clear > output from all executors on the faulty host. If the number of executors on a > host is greater than the stage retry limit then this can exhaust stage retry > attempts and cause job failures. > This "multiple rounds of recomputation to discover all failed executors on a > host" problem was addressed by SPARK-19753, which added a > {{spark.files.fetchFailure.unRegisterOutputOnHost}} configuration which > promotes executor fetch failures into host-wide fetch failures (clearing > output from all neighboring executors upon a single failure). However, that > configuration is {{false}} by default. > h2. Potential solutions > I have a few ideas about how we can improve this situation: > - Update the [YARN external shuffle service > documentation|https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service] > to recommend enabling node manager recovery. > - Consider defaulting {{spark.files.fetchFailure.unRegisterOutputOnHost}} to > {{true}}. This would improve out-of-the-box resiliency for large clusters. > The trade-off here is a reduction of efficiency in case there are transient > "false positive" fetch failures, but I suspect this case may be unlikely in > practice (so the change of default could be an acceptable trade-off). See > [prior discussion on > GitHub|https://github.com/apache/spark/pull/18150#discussion_r119736751]. > - Modify DAGScheduler to add special-case handling for "Executor is not > registered" exceptions that trigger FetchFailures: if we see this exception > then it implies that the shuffle service failed to recover state, implying > that all of its prior outputs are effectively unavailable. In this case, it > _might_ be safe to unregister all host outputs irrespective of whether the > {{unRegisterOutputOnHost}} flag is set. > -- This might require us to string-match on exceptions (so we can be > backwards-compatible with old shuffle services, freeing users from needing to > upgrade / restart NMs to pick up this fix). > -- I suppose there's the potential for race conditions where the shuffle > service restarts and produces _new_ map outputs from freshly-registered > executors, only for us to turn around and unnecessarily clear those outputs > as part of the cleanup of the pre-shuffle-service-restart outputs. If we > assume shuffle service and executor deaths are coupled (i.e. that death of > the shuffle service process implies death of all executors, something which I > believe is true of both YARN NM and Standalone Worker death) then we could be > a bit more precise and invalidate outputs from all _dead_ executors on that > host. > I'm open to other suggestions, too. > /cc [~imranr] [~sitalke...@gmail.com] [~tgraves] [~vanzin] as FYI (since I > think you've all worked on relevant parts of this code). -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org