[ 
https://issues.apache.org/jira/browse/SPARK-17911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572289#comment-15572289
 ] 

Imran Rashid edited comment on SPARK-17911 at 10/13/16 3:44 PM:
----------------------------------------------------------------

Copying the earlier discussion on the PR here

from squito:
bq.I find myself frequently wondering about the purpose of this. Its commented 
very tersely on RESUBMIT_TIMEOUT, but I think it might be nice to add a longer 
comment here. I guess something like
bq. "If we get one fetch-failure, we often get more fetch failures across 
multiple executors. We will get better parallelism when we resubmit the 
mapStage if we can resubmit when we know about as many of those failures as 
possible. So this is a heuristic to add a small delay to see if we gather a few 
more failures before we resubmit."
bq.We do not need the delay to figure out exactly which shuffle-map outputs are 
gone on the executor -- we always mark the executor as lost on a fetch failure, 
which means we mark all its map output as gone. (This is really confusing -- it 
looks like we only remove the one shuffle-map output that was involved in the 
fetch failure, but then the entire removal is buried inside another method a 
few lines further.)
bq.I did some browsing through history, and there used to be this comment
{noformat}
     // Periodically resubmit failed stages if some map output fetches have 
failed and we have
     // waited at least RESUBMIT_TIMEOUT. We wait for this short time because 
when a node fails,
     // tasks on many other nodes are bound to get a fetch failure, and they 
won't all get it at
     // the same time, so we want to make sure we've identified all the reduce 
tasks that depend
     // on the failed node.
{noformat}
bq. at least in the current version, this also sounds like a bad reason to have 
the delay. failedStage won't be resubmitted till mapStage completes anyway, and 
then it'll look to see what tasks it is missing. Adding a tiny delay on top of 
the natural delay for mapStage seems pretty pointless.
bq. I don't even think that the reason I gave in my suggested comment is a good 
one -- do you really expect failures in multiple executors? But it is at least 
logically consistent.

from markhamstra
bq. I don't like "Periodically" in your suggested comment, since this is a 
one-shot action after a delay of RESUBMIT_TIMEOUT milliseconds.
bq.I agree that this delay-before-resubmit logic is suspect. If we are both 
thinking correctly that a 200 ms delay on top of the time to re-run the 
mapStage is all but inconsequential, then removing it in this PR would be fine. 
If there are unanticipated consequences, though, I'd prefer to have that change 
in a separate PR.

from squito
bq. yeah probably a separate PR, sorry this was just an opportunity for me to 
rant :)
bq.And sorry if I worded it poorly, but I was not suggesting the one w/ 
"Periodically" as a better comment -- in fact I think its a bad comment, just 
wanted to mention it was another description which used to be there long ago.
bq. This was my suggestion:
bq. If we get one fetch-failure, we often get more fetch failures across 
multiple executors. We will get better parallelism when we resubmit the 
mapStage if we can resubmit when we know about as many of those failures as 
possible. So this is a heuristic to add a small delay to see if we gather a few 
more failures before we resubmit.

from markhamstra
bq. Ah, sorry for ascribing the prior comment to your preferences. That comment 
actually did make sense a long time ago when the resubmitting of stages really 
was done periodically by an Akka scheduled event that fired every something 
seconds. I'm pretty sure the RESUBMIT_TIMEOUT stuff is also legacy code that 
doesn't make sense and isn't necessary any more.
bq. So, do you want to do the follow-up PR to get rid of it, or shall I?
bq. BTW, nothing wrong with your wording -- but my poor reading can create 
misunderstanding of even the clearest text.
from squito
bq. if you are willing, could you please file the follow up? I am bouncing 
between various things in my backlog -- though that change is small, I have a 
feeling it will be merit extra discussion as a risky change, would be great if 
you drive it
from markhamstra
bq. Ok, I can get started on that. I believe that leaves this PR ready to merge.
from markhamstra
bq. I think that I am going to backtrack on creating a new PR, because I think 
that the RESUBMIT_TIMEOUT actually does still make sense.
bq. If we go way back in DAGScheduler history 
(https://github.com/apache/spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala)
 we'll find that we had an event queue that was polled every 10 millis 
(POLL_TIMEOUT) and that fetch failures didn't produce separate resubmit tasks 
events, but rather we called resubmitFailedStages within the event 
polling/handling loop if 50 millis (RESUBMIT_TIMEOUT) had passed since the last 
time a FetchFailed was received in a CompletionEvent. The handling of a 
FetchFailure didn't have any check of whether a failed stage was already in 
failedStages or any other kind of heuristic as to whether a resubmit was 
already scheduled for a failed stage, so the RESUBMIT_TIMEOUT was effectively 
the only thing preventing multiple resubmits from occurring as quickly as every 
10 ms.
bq. RESUBMIT_TIMEOUT has persisted through the multiple iterations of the 
DAGScheduler since then, and now it is effectively making sure that the time 
between ResubmitFailedStages events is at least 200 ms. The time to actually 
complete any of the stages in failedStages when the ResubmitFailedStages event 
is handled doesn't really come into play, since failedStages is cleared within 
resubmitFailedStages and that method returns as soon as the stages that were 
queued up are resubmitted, not actually done re-calculating. In other words, 
handling a ResubmitFailedStages event should be quick, and causes failedStages 
to be cleared, allowing the next ResubmitFailedStages event to be posted from 
the handling of another FetchFailed. If there are the expected lot of fetch 
failures for a single stage, and there is no RESUBMIT_TIMEOUT, then it is quite 
likely that there will be a burst of resubmit events (and corresponding log 
messages) and submitStage calls made in rapid succession.
bq. I think I'm inclined to keep the RESUBMIT_TIMEOUT throttle.



was (Author: irashid):
Copying the earlier discussion on the PR here

from squito:
bq.I find myself frequently wondering about the purpose of this. Its commented 
very tersely on RESUBMIT_TIMEOUT, but I think it might be nice to add a longer 
comment here. I guess something like
bq. "If we get one fetch-failure, we often get more fetch failures across 
multiple executors. We will get better parallelism when we resubmit the 
mapStage if we can resubmit when we know about as many of those failures as 
possible. So this is a heuristic to add a small delay to see if we gather a few 
more failures before we resubmit."
bq.We do not need the delay to figure out exactly which shuffle-map outputs are 
gone on the executor -- we always mark the executor as lost on a fetch failure, 
which means we mark all its map output as gone. (This is really confusing -- it 
looks like we only remove the one shuffle-map output that was involved in the 
fetch failure, but then the entire removal is buried inside another method a 
few lines further.)
bq.I did some browsing through history, and there used to be this comment
{noformat}
     // Periodically resubmit failed stages if some map output fetches have 
failed and we have
     // waited at least RESUBMIT_TIMEOUT. We wait for this short time because 
when a node fails,
     // tasks on many other nodes are bound to get a fetch failure, and they 
won't all get it at
     // the same time, so we want to make sure we've identified all the reduce 
tasks that depend
     // on the failed node.
{noformat}
bq. at least in the current version, this also sounds like a bad reason to have 
the delay. failedStage won't be resubmitted till mapStage completes anyway, and 
then it'll look to see what tasks it is missing. Adding a tiny delay on top of 
the natural delay for mapStage seems pretty pointless.
bq. I don't even think that the reason I gave in my suggested comment is a good 
one -- do you really expect failures in multiple executors? But it is at least 
logically consistent.

from markhamstra
bq. I don't like "Periodically" in your suggested comment, since this is a 
one-shot action after a delay of RESUBMIT_TIMEOUT milliseconds.
bq.I agree that this delay-before-resubmit logic is suspect. If we are both 
thinking correctly that a 200 ms delay on top of the time to re-run the 
mapStage is all but inconsequential, then removing it in this PR would be fine. 
If there are unanticipated consequences, though, I'd prefer to have that change 
in a separate PR.

from squito
bq. yeah probably a separate PR, sorry this was just an opportunity for me to 
rant :)
bq.And sorry if I worded it poorly, but I was not suggesting the one w/ 
"Periodically" as a better comment -- in fact I think its a bad comment, just 
wanted to mention it was another description which used to be there long ago.
bq. This was my suggestion:
bq. If we get one fetch-failure, we often get more fetch failures across 
multiple executors. We will get better parallelism when we resubmit the 
mapStage if we can resubmit when we know about as many of those failures as 
possible. So this is a heuristic to add a small delay to see if we gather a few 
more failures before we resubmit.

from markhamstra
bq. Ah, sorry for ascribing the prior comment to your preferences. That comment 
actually did make sense a long time ago when the resubmitting of stages really 
was done periodically by an Akka scheduled event that fired every something 
seconds. I'm pretty sure the RESUBMIT_TIMEOUT stuff is also legacy code that 
doesn't make sense and isn't necessary any more.
bq. So, do you want to do the follow-up PR to get rid of it, or shall I?
bq. BTW, nothing wrong with your wording -- but my poor reading can create 
misunderstanding of even the clearest text.
from squito
bq. if you are willing, could you please file the follow up? I am bouncing 
between various things in my backlog -- though that change is small, I have a 
feeling it will be merit extra discussion as a risky change, would be great if 
you drive it
bq. Ok, I can get started on that. I believe that leaves this PR ready to merge.
bq. I think that I am going to backtrack on creating a new PR, because I think 
that the RESUBMIT_TIMEOUT actually does still make sense.
bq. If we go way back in DAGScheduler history 
(https://github.com/apache/spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala)
 we'll find that we had an event queue that was polled every 10 millis 
(POLL_TIMEOUT) and that fetch failures didn't produce separate resubmit tasks 
events, but rather we called resubmitFailedStages within the event 
polling/handling loop if 50 millis (RESUBMIT_TIMEOUT) had passed since the last 
time a FetchFailed was received in a CompletionEvent. The handling of a 
FetchFailure didn't have any check of whether a failed stage was already in 
failedStages or any other kind of heuristic as to whether a resubmit was 
already scheduled for a failed stage, so the RESUBMIT_TIMEOUT was effectively 
the only thing preventing multiple resubmits from occurring as quickly as every 
10 ms.
bq. RESUBMIT_TIMEOUT has persisted through the multiple iterations of the 
DAGScheduler since then, and now it is effectively making sure that the time 
between ResubmitFailedStages events is at least 200 ms. The time to actually 
complete any of the stages in failedStages when the ResubmitFailedStages event 
is handled doesn't really come into play, since failedStages is cleared within 
resubmitFailedStages and that method returns as soon as the stages that were 
queued up are resubmitted, not actually done re-calculating. In other words, 
handling a ResubmitFailedStages event should be quick, and causes failedStages 
to be cleared, allowing the next ResubmitFailedStages event to be posted from 
the handling of another FetchFailed. If there are the expected lot of fetch 
failures for a single stage, and there is no RESUBMIT_TIMEOUT, then it is quite 
likely that there will be a burst of resubmit events (and corresponding log 
messages) and submitStage calls made in rapid succession.
bq. I think I'm inclined to keep the RESUBMIT_TIMEOUT throttle.


> Scheduler does not need messageScheduler for ResubmitFailedStages
> -----------------------------------------------------------------
>
>                 Key: SPARK-17911
>                 URL: https://issues.apache.org/jira/browse/SPARK-17911
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler
>    Affects Versions: 2.0.0
>            Reporter: Imran Rashid
>
> Its not totally clear what the purpose of the {{messageScheduler}} is in 
> {{DAGScheduler}}.  It can perhaps be eliminated completely; or perhaps we 
> should just clearly document its purpose.
> This comes from a long discussion w/ [~markhamstra] on an unrelated PR here: 
> https://github.com/apache/spark/pull/15335/files/c80ad22a242255cac91cce2c7c537f9b21100f70#diff-6a9ff7fb74fd490a50462d45db2d5e11
> But its tricky so breaking it out here for archiving the discussion.
> Note: this issue requires a decision on what to do before a code change, so 
> lets just discuss it on jira first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to