[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user sitalkedia closed the pull request at: https://github.com/apache/spark/pull/17297 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107044660 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,12 +946,22 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ + /** + * Called when stage's parents are available and we can now run its task. + * This only submits the partitions which are missing and have not been + * submitted to the lower-level scheduler for execution. + */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") -// First figure out the indexes of partition ids to compute. -val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() +val missingPartitions = stage.findMissingPartitions() +val partitionsToCompute = + missingPartitions.filter(id => !stage.pendingPartitions.contains(id)) --- End diff -- ```scala missingPartitions.filterNot(stage.pendingPartitions) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107044272 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -803,6 +810,16 @@ class DAGScheduler( stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } } + private[scheduler] def handleTasksAborted( + stageId: Int, + tasks: Seq[Task[_]]): Unit = { +for (stage <- stageIdToStage.get(stageId)) { + for (task <- tasks) { +stage.pendingPartitions -= task.partitionId + } +} --- End diff -- ```scala for { stage <- stageIdToStage.get(stageId) task <- tasks } stage.pendingPartitions -= task.partitionId ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107040190 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -418,6 +424,15 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId) } + /** Get the epoch for map output for a shuffle, if it is available */ + def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { +val arrayOpt = mapStatuses.get(shuffleId) +if (arrayOpt.isDefined && arrayOpt.get != null && arrayOpt.get(mapId) != null) { + return Some(epochForMapStatus.get(shuffleId).get(mapId)) +} +None + } --- End diff -- First, `arrayOpt.get != null` isn't necessary since we don't put `null` values into `mapStatuses`. Second, `epochForMapStatus.get(shuffleId).get` is the same as `epochForMapStatus(shuffleId)`. Third, I don't like all the explicit `get`s,`null` checks and the unnecessary non-local `return`. To my mind, this is better: ``` scala def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = { for { mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray => Option(mapStatusArray(mapId)) } } yield epochForMapStatus(shuffleId)(mapId) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107018874 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status + val epochs = epochForMapStatus.get(shuffleId).get --- End diff -- ```scala val epochs = epochForMapStatus(shuffleId) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107018555 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status + val epochs = epochForMapStatus.get(shuffleId).get + epochs(mapId) = epoch } } /** Register multiple map output information for the given shuffle */ def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) { -mapStatuses.put(shuffleId, statuses.clone()) if (changeEpoch) { incrementEpoch() } +mapStatuses.put(shuffleId, statuses.clone()) --- End diff -- What was the point of moving this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r107017201 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1265,64 +1280,11 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) -if (failedStage.latestInfo.attemptId != task.stageAttemptId) { - logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + -s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + -s"(attempt ID ${failedStage.latestInfo.attemptId}) running") -} else { - // It is likely that we receive multiple FetchFailed for a single stage (because we have - // multiple tasks running concurrently on different executors). In that case, it is - // possible the fetch failure has already been handled by the scheduler. - if (runningStages.contains(failedStage)) { -logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + - s"due to a fetch failure from $mapStage (${mapStage.name})") -markStageAsFinished(failedStage, Some(failureMessage)) - } else { -logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + - s"longer running") - } - - val shouldAbortStage = -failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || -disallowStageRetryForTest - - if (shouldAbortStage) { -val abortMessage = if (disallowStageRetryForTest) { - "Fetch failure will not retry stage due to testing config" -} else { - s"""$failedStage (${failedStage.name}) - |has failed the maximum allowable number of - |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. - |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") -} -abortStage(failedStage, abortMessage, None) - } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued -// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 -val noResubmitEnqueued = !failedStages.contains(failedStage) -failedStages += failedStage -failedStages += mapStage -if (noResubmitEnqueued) { - // We expect one executor failure to trigger many FetchFailures in rapid succession, - // but all of those task failures can typically be handled by a single resubmission of - // the failed stage. We avoid flooding the scheduler's event queue with resubmit - // messages by checking whether a resubmit is already in the event queue for the - // failed stage. If there is already a resubmit enqueued for a different failed - // stage, that event would also be sufficient to handle the current failed stage, but - // producing a resubmit for each failed stage makes debugging and logging a little - // simpler while not producing an overwhelming number of scheduler events. - logInfo( -s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure" - ) - messageScheduler.schedule( -new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) -}, -DAGScheduler.RESUBMIT_TIMEOUT, -TimeUnit.MILLISECONDS - ) -} - } +val epochForMapOutput = mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) +// It is possible that the map output was regenerated by rerun of the stage and the +// fetch failure is being reported for stale map output. In that case, we should just +// ignore the fetch failure and relaunch the task with latest map output info. +if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= task.epoch) { --- End diff -- I'd be inclined to do this without the extra binding and `get`: ```scala for(epochForMapOutput <- mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if epochForMapOutput <= task.epoch) { // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress)
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r106774683 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -193,13 +193,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => --- End diff -- @squito - That's correct, this is checking that we should not have more than one non-zombie attempts of a stage running. But in the scenario in (d) you described below, we will end up having more than two non-zombie attempts. However, my point is there is no reason we should not allow multiple concurrent attempts of a stage to run, the only thing we should guarantee is we are running mutually exclusive tasks in those attempts. With this change, since the dag scheduler already keeps track of submitted/running tasks, it can guarantee that it will not resubmit duplicate tasks for a stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r106774285 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -193,13 +193,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => --- End diff -- actually, that is not really the point of this check. Its just checking if one stage has two tasksets (aka stage attempts), where both are in the "non-zombie" state. It doesn't do any checks at all on what tasks are actually in those tasksets. This is just checking an invariant which we believe to always be true, but we figure its better to fail-fast if we hit this condition, rather than proceed with some inconsistent state. This check was added because behavior gets *really* confusing when the invariant is violated, and though we think it should always be true, we've still hit cases where it happens. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/17297#discussion_r106315206 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -193,13 +193,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => --- End diff -- Please note that this check is not needed anymore because the DagScheduler already keeps track of running tasks and does not submit duplicate tasks anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17297: [SPARK-14649][CORE] DagScheduler should not run d...
GitHub user sitalkedia opened a pull request: https://github.com/apache/spark/pull/17297 [SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fe⦠## What changes were proposed in this pull request? When a fetch failure occurs, the DAGScheduler re-launches the previous stage (to re-generate output that was missing), and then re-launches all tasks in the stage with the fetch failure that hadn't completed when the fetch failure occurred (the DAGScheduler re-lanches all of the tasks whose output data is not available -- which is equivalent to the set of tasks that hadn't yet completed). This some times leads to wasteful duplicate task run for the jobs with long running task. To address the issue following changes have been made. 1. When a fetch failure happens, the task set manager ask the dag scheduler to abort all the non-running tasks. However, the running tasks in the task set are not killed. 2. When a task is aborted, the dag scheduler adds the task to the pending task list. 3. In case of resubmission of the stage, the dag scheduler only resubmits the tasks which are in pending stage. ## How was this patch tested? Added new tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sitalkedia/spark avoid_duplicate_tasks_new Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17297.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17297 commit e5429d309801bffb8ddc907fb4800efb6fb1a2fa Author: Sital KediaDate: 2016-04-15T23:44:23Z [SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fetch failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org