[
https://issues.apache.org/jira/browse/HADOOP-2141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andy Konwinski updated HADOOP-2141:
-----------------------------------
Attachment: HADOOP-2141-v3.patch
Thanks Arun, I'll address your comments inline:
- JobInProgress.getSpeculative{Map|Reduce} are both called from synchronized
methods i.e. JobInProgress.findNew{Map|Reduce}Task; hence please mark these as
synchronized too, just to be future-proof.
A) Done
- JobInProgress.findSpeculativeTask's 'shouldRemove' parameter is always passed
in as 'false' (from getSpeculative{Map|Reduce}) ... do we even need this
parameter?
B) I removed the parameter. It was left over from when Hadoop used to pass a
TIP list that was specific per machine
- JobInProgress.isTaskSlowEnoughToSpeculate gets
mapred.speculative.execution.slowTaskThreshold from the JobConf always - we
should just cache that in a private variable.
C) Done
-Ditto for
JobInProgress.isSlowTracker/mapred.speculative.execution.slowNodeThreshold
D) Done
-and JobInProgress.atSpeculativeCap/mapred.speculative.execution.speculativeCap.
E) Done
-(Also please remove the LOG.info for the config variable in
JobInProgress.isTaskSlowEnoughToSpeculate).
F) Done
- JobInProgress.findSpeculativeTask gets a List of TIPs, it then proceeds to
convert that to an TIP[] for JobInProgress.isSlowTracker etc. - we should just
get all apis to work with List<TIP> and do away with that conversion.
G) Done
- Can we keep a running count of 'progress' of TaskTrackers' tasks rather than
recompute them each time in JobInProgress.isSlowTracker? For large jobs it
might be significant...
H) In this patch (v3), we are calling task.getProgressRate() on each task in
the ProgressRateComparator which returns the progressRate score for that task
(which isn't computed on the spot, it is updated asynchronously when the
progress for that tip is reported). if we were to keep a running count in
JobInProgress that the TIPs are responsible for updating as they make progress
with some sort of callback, that seems like a lot of added complexity plus the
overhead for that data structure to push the updates to when we only use them
while looking for speculative tasks, which is a relatively infrequent
operation. Thus I still see this pull model as better.
- JobInProgress.isTaskSlowEnoughToSpeculate really bothers me. It is called
from inside a loop (i.e. for each TIP) and it sorts the progress of each TIP.
This is potentially very expensive. At the very least we should sort the the
TIPs once and even better - we should maintain a PriorityQueue of TIPs based on
their progress.
I) I eliminated the isTaskSlowEnoughToSpeculate function and the inner loop
behavior you pointed out by pulling everything into findSpeculativeTask, which
adds another sort operation to the already existing sort in findSpeculativeTask
(see J below).
- I'm guessing that sorting 'candidate speculative tasks' in
JobInProgress.findSpeculativeTask isn't prohibitively expensive since the
number of candidates is fairly small, could you please confirm?
J) As of this patch, we are using a double sorting behavior, which I don't see
a good way around for now. The first sort is to be sure we only launch
speculative tasks which are actually slow, the second one is to decide amongst
those slow tasks based on their expected completion time and this second sort
is considerably smaller (since it operates on the chopped down set of
candidates). The first sort will be sorting through all running tasks, which
for large MapReduce jobs will be in the tens of thousands, right? However,
remember that the progress of each task is not computed at access time (see H
above). We can't keep a snapshot of sorted progress around for very long before
it grows stale, but I still think that switching to a push model (from the
perspective of the tasks at progress update time) will add the overhead of a
heap insert for progress updates of every task for the entire job, when we
really only care about tasks that are running while we look for speculative
tasks (which is hopefully only at the end of map or reduce stages of a job).
If this is a concern, as an intermediate step before switching to a heap, we
could keep the sorted list of candidates around and only recompute it at most
every X (5?) seconds.
- Minor: Please adhere to the 80 character limit per-line.
K) Done
========
Another thought: we are currently doing the first sort/chop in
findSpeculativeTask to grab the slowest 25% of the tasks currently running. We
originally intended slowTaskThreshold to decide if a task was slow enough to
speculate, which would imply that if all currently running tasks appear
approximately equally slow, then none should be launched for speculative
execution. However, that is not the current behavior, which suggests we might
want to use variance of progressRate instead of just sorting tasks by
progressRate and taking the "slowest" 25% (which would be an arbitrary 25% of
the tasks if they all had the same progressRate).
This version of the patch differs enough from the code we were using for our
experiments in the OSDI paper that we want to do some more testing with it
(maybe even gridmix), but we wanted to get it up for other eyes to look at
right away, hopefully aiming for 0.20 with this one.
> speculative execution start up condition based on completion time
> -----------------------------------------------------------------
>
> Key: HADOOP-2141
> URL: https://issues.apache.org/jira/browse/HADOOP-2141
> Project: Hadoop Core
> Issue Type: Improvement
> Components: mapred
> Affects Versions: 0.19.0
> Reporter: Koji Noguchi
> Assignee: Andy Konwinski
> Attachments: HADOOP-2141-v2.patch, HADOOP-2141-v3.patch,
> HADOOP-2141.patch
>
>
> We had one job with speculative execution hang.
> 4 reduce tasks were stuck with 95% completion because of a bad disk.
> Devaraj pointed out
> bq . One of the conditions that must be met for launching a speculative
> instance of a task is that it must be at least 20% behind the average
> progress, and this is not true here.
> It would be nice if speculative execution also starts up when tasks stop
> making progress.
> Devaraj suggested
> bq. Maybe, we should introduce a condition for average completion time for
> tasks in the speculative execution check.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.