[ 
https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019123#comment-16019123
 ] 

Josh Rosen commented on SPARK-20178:
------------------------------------

Looking over a few of the tickets linked to this fetch failure handling 
umbrella, I've noticed that there is a commonality in several linked JIRAs 
where folks are proposing to treat a single fetch failure from a node as though 
all outputs on that node were lost. While this is beneficial for avoiding the 
behavior where we keep repeatedly trying to refetch from a malfunctioning node 
or an external shuffle service which has disappeared, it may go too far in some 
situations and can cause unnecessary recomputations. For example, in a 
multi-user multi-job environment there could be a high cost to a false-positive 
where you mark a healthy block manager/shuffle service as unavailable following 
a single FetchFailure: this takes a failure which might be isolated to a single 
stage and promotes it into a wider failure that can impact other concurrently 
running stages (or which can destroy the ability to leverage the implicit 
caching of shuffle outputs across job runs).

To work around this problem, it looks like there are several proposals (but not 
PRs yet) for more complex approaches which attempt to infer whether a fetch 
failure indicates complete unavailability by keeping statistics on the number 
of fetch failures attributed to each node. The idea here is very similar to 
executor blacklisting, except applied to output locations. This is a good idea 
for the longer term because it can help to mitigate against nodes which 
silently corrupt most data written to disk (a failure mode we won't tolerate 
well today), but I don't think it's the right fix for the immediate issue being 
discussed in this ticket: these proposals will require significant amounts of 
new bookeeping logic to implement (which is hard to do efficiently and without 
causing memory leaks / perf. issues) and involve threshold-based detection 
logic which can require tuning to get correct.

As a compromise, I would like to propose a slightly weaker version of 
SPARK-20115 and SPARK-19753: when the DAGScheduler is notified of a 
FetchFailure from a node then mark _that shuffle's output locations on that 
node_ as unavailable (rather than all shuffles' outputs on that node). The 
rationale behind this is that the FetchFailure is already going to cause 
recomputation of that shuffle and the likelihood of the FetchFailure being a 
transient failure is relatively small: tasks already have internal retries when 
fetching (see both RetryingBlockFetcher and [~davies]'s patch for retrying 
within the task when small fetched shuffle blocks are determined to be 
corrupt), so if a task fails with a FetchFailure then it seems likely that the 
actual output that we tried to fetch is unavailable or corrupt. 

I think that this proposal should be simple to implement (and backport 
(optionally in a feature-flagged manner)) and hopefully won't be controversial 
because it's much more limited in the scope of the extra inferences it draws 
from FetchFailures . It also does not preclude the other proposals from being 
implemented later.

Feedback on this is very welcome. If there's support then I'd like to take a 
shot at implementing it.

> Improve Scheduler fetch failures
> --------------------------------
>
>                 Key: SPARK-20178
>                 URL: https://issues.apache.org/jira/browse/SPARK-20178
>             Project: Spark
>          Issue Type: Epic
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Thomas Graves
>
> We have been having a lot of discussions around improving the handling of 
> fetch failures.  There are 4 jira currently related to this.  
> We should try to get a list of things we want to improve and come up with one 
> cohesive design.
> SPARK-20163,  SPARK-20091,  SPARK-14649 , and SPARK-19753
> I will put my initial thoughts in a follow on comment.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to