[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-05-24 Thread sitalkedia
Github user sitalkedia closed the pull request at:

https://github.com/apache/spark/pull/17297


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107044660
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -929,12 +946,22 @@ class DAGScheduler(
 }
   }
 
-  /** Called when stage's parents are available and we can now do its 
task. */
+  /**
+   * Called when stage's parents are available and we can now run its task.
+   * This only submits the partitions which are missing and have not been
+   * submitted to the lower-level scheduler for execution.
+   */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug("submitMissingTasks(" + stage + ")")
 
-// First figure out the indexes of partition ids to compute.
-val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
+val missingPartitions = stage.findMissingPartitions()
+val partitionsToCompute =
+  missingPartitions.filter(id => !stage.pendingPartitions.contains(id))
--- End diff --

```scala
missingPartitions.filterNot(stage.pendingPartitions)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107044272
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -803,6 +810,16 @@ class DAGScheduler(
 stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, 
exception) }
   }
 
+  private[scheduler] def handleTasksAborted(
+  stageId: Int,
+  tasks: Seq[Task[_]]): Unit = {
+for (stage <- stageIdToStage.get(stageId)) {
+  for (task <- tasks) {
+stage.pendingPartitions -= task.partitionId
+  }
+}
--- End diff --

```scala
for {
  stage <- stageIdToStage.get(stageId)
  task <- tasks
} stage.pendingPartitions -= task.partitionId
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107040190
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -418,6 +424,15 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf,
 cachedSerializedStatuses.contains(shuffleId) || 
mapStatuses.contains(shuffleId)
   }
 
+  /** Get the epoch for map output for a shuffle, if it is available */
+  def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = {
+val arrayOpt = mapStatuses.get(shuffleId)
+if (arrayOpt.isDefined && arrayOpt.get != null && arrayOpt.get(mapId) 
!= null) {
+   return Some(epochForMapStatus.get(shuffleId).get(mapId))
+}
+None
+  }
--- End diff --

First, `arrayOpt.get != null` isn't necessary since we don't put `null` 
values into `mapStatuses`. Second, `epochForMapStatus.get(shuffleId).get` is 
the same as `epochForMapStatus(shuffleId)`. Third, I don't like all the 
explicit `get`s,`null` checks and the unnecessary non-local `return`. To my 
mind, this is better:
``` scala
  def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = {
for {
  mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray =>
Option(mapStatusArray(mapId))
  }
} yield epochForMapStatus(shuffleId)(mapId)
  }
``` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107018874
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf,
 val array = mapStatuses(shuffleId)
 array.synchronized {
   array(mapId) = status
+  val epochs = epochForMapStatus.get(shuffleId).get
--- End diff --

```scala
val epochs = epochForMapStatus(shuffleId)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107018555
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf,
 val array = mapStatuses(shuffleId)
 array.synchronized {
   array(mapId) = status
+  val epochs = epochForMapStatus.get(shuffleId).get
+  epochs(mapId) = epoch
 }
   }
 
   /** Register multiple map output information for the given shuffle */
   def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], 
changeEpoch: Boolean = false) {
-mapStatuses.put(shuffleId, statuses.clone())
 if (changeEpoch) {
   incrementEpoch()
 }
+mapStatuses.put(shuffleId, statuses.clone())
--- End diff --

What was the point of moving this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r107017201
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1265,64 +1280,11 @@ class DAGScheduler(
 val failedStage = stageIdToStage(task.stageId)
 val mapStage = shuffleIdToMapStage(shuffleId)
 
-if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
-  logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
-s" ${task.stageAttemptId} and there is a more recent attempt 
for that stage " +
-s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
-} else {
-  // It is likely that we receive multiple FetchFailed for a 
single stage (because we have
-  // multiple tasks running concurrently on different executors). 
In that case, it is
-  // possible the fetch failure has already been handled by the 
scheduler.
-  if (runningStages.contains(failedStage)) {
-logInfo(s"Marking $failedStage (${failedStage.name}) as failed 
" +
-  s"due to a fetch failure from $mapStage (${mapStage.name})")
-markStageAsFinished(failedStage, Some(failureMessage))
-  } else {
-logDebug(s"Received fetch failure from $task, but its from 
$failedStage which is no " +
-  s"longer running")
-  }
-
-  val shouldAbortStage =
-failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
-disallowStageRetryForTest
-
-  if (shouldAbortStage) {
-val abortMessage = if (disallowStageRetryForTest) {
-  "Fetch failure will not retry stage due to testing config"
-} else {
-  s"""$failedStage (${failedStage.name})
- |has failed the maximum allowable number of
- |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
- |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
-}
-abortStage(failedStage, abortMessage, None)
-  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
-// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
-val noResubmitEnqueued = !failedStages.contains(failedStage)
-failedStages += failedStage
-failedStages += mapStage
-if (noResubmitEnqueued) {
-  // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
-  // but all of those task failures can typically be handled 
by a single resubmission of
-  // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
-  // messages by checking whether a resubmit is already in the 
event queue for the
-  // failed stage.  If there is already a resubmit enqueued 
for a different failed
-  // stage, that event would also be sufficient to handle the 
current failed stage, but
-  // producing a resubmit for each failed stage makes 
debugging and logging a little
-  // simpler while not producing an overwhelming number of 
scheduler events.
-  logInfo(
-s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure"
-  )
-  messageScheduler.schedule(
-new Runnable {
-  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-},
-DAGScheduler.RESUBMIT_TIMEOUT,
-TimeUnit.MILLISECONDS
-  )
-}
-  }
+val epochForMapOutput = 
mapOutputTracker.getEpochForMapOutput(shuffleId, mapId)
+// It is possible that the map output was regenerated by rerun of 
the stage and the
+// fetch failure is being reported for stale map output. In that 
case, we should just
+// ignore the fetch failure and relaunch the task with latest map 
output info.
+if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= 
task.epoch) {
--- End diff --

I'd be inclined to do this without the extra binding and `get`:
```scala
for(epochForMapOutput <- 
mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if
epochForMapOutput <= task.epoch) {
  // Mark the map whose fetch failed as broken in the map stage
  if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)

[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-18 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r106774683
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -193,13 +193,6 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
   val stageTaskSets =
 taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new 
HashMap[Int, TaskSetManager])
   stageTaskSets(taskSet.stageAttemptId) = manager
-  val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
--- End diff --

@squito - That's correct, this is checking that we should not have more 
than one non-zombie attempts of a stage running. But in the scenario in (d) you 
described below, we will end up having more than two non-zombie attempts. 

However, my point is there is no reason we should not allow multiple 
concurrent attempts of a stage to run, the only thing we should guarantee is we 
are running mutually exclusive tasks in those attempts. With this change, since 
the dag scheduler already keeps track of submitted/running tasks, it can 
guarantee that it will not resubmit duplicate tasks for a stage. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r106774285
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -193,13 +193,6 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
   val stageTaskSets =
 taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new 
HashMap[Int, TaskSetManager])
   stageTaskSets(taskSet.stageAttemptId) = manager
-  val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
--- End diff --

actually, that is not really the point of this check.  Its just checking if 
one stage has two tasksets (aka stage attempts), where both are in the 
"non-zombie" state.  It doesn't do any checks at all on what tasks are actually 
in those tasksets.

This is just checking an invariant which we believe to always be true, but 
we figure its better to fail-fast if we hit this condition, rather than proceed 
with some inconsistent state.  This check was added because behavior gets 
*really* confusing when the invariant is violated, and though we think it 
should always be true, we've still hit cases where it happens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-15 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/17297#discussion_r106315206
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -193,13 +193,6 @@ private[spark] class TaskSchedulerImpl 
private[scheduler](
   val stageTaskSets =
 taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new 
HashMap[Int, TaskSetManager])
   stageTaskSets(taskSet.stageAttemptId) = manager
-  val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
--- End diff --

Please note that this check is not needed anymore because the DagScheduler 
already keeps track of running tasks and does not submit duplicate tasks 
anymore. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...

2017-03-14 Thread sitalkedia
GitHub user sitalkedia opened a pull request:

https://github.com/apache/spark/pull/17297

[SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fe…

## What changes were proposed in this pull request?

When a fetch failure occurs, the DAGScheduler re-launches the previous 
stage (to re-generate output that was missing), and then re-launches all tasks 
in the stage with the fetch failure that hadn't completed when the fetch 
failure occurred (the DAGScheduler re-lanches all of the tasks whose output 
data is not available -- which is equivalent to the set of tasks that hadn't 
yet completed). This some times leads to wasteful duplicate task run for the 
jobs with long running task.

To address the issue following changes have been made.

1. When a fetch failure happens, the task set manager ask the dag scheduler 
to abort all the non-running tasks. However, the running tasks in the task set 
are not killed.
2. When a task is aborted, the dag scheduler adds the task to the pending 
task list.
3. In case of resubmission of the stage, the dag scheduler only resubmits 
the tasks which are in pending stage.


## How was this patch tested?

Added new tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sitalkedia/spark avoid_duplicate_tasks_new

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17297.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17297


commit e5429d309801bffb8ddc907fb4800efb6fb1a2fa
Author: Sital Kedia 
Date:   2016-04-15T23:44:23Z

[SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fetch 
failure




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org