[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user vanzin closed the pull request at: https://github.com/apache/spark/pull/21607 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/21607#discussion_r197508752 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -153,25 +163,26 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) // Marked private[scheduler] instead of private so this can be mocked in tests private[scheduler] def handleAskPermissionToCommit( - stage: StageId, - partition: PartitionId, - attemptNumber: TaskAttemptNumber): Boolean = synchronized { + stage: Int, + stageAttempt: Int, + partition: Int, + attemptNumber: Int): Boolean = synchronized { authorizedCommittersByStage.get(stage) match { case Some(authorizedCommitters) => -authorizedCommitters(partition) match { - case NO_AUTHORIZED_COMMITTER => -logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " + - s"partition=$partition") -authorizedCommitters(partition) = attemptNumber -true - case existingCommitter => -logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + - s"partition=$partition; existingCommitter = $existingCommitter") -false +val existing = authorizedCommitters(partition) +if (existing == null) { + logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " + --- End diff -- $stage.$stageAttempt --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21607#discussion_r197499240 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -97,48 +102,46 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } /** - * Called by the DAGScheduler when a stage starts. + * Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't + * yet been initialized. * * @param stage the stage id. * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart( - stage: StageId, - maxPartitionId: Int): Unit = { -val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) -java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) -synchronized { - authorizedCommittersByStage(stage) = arr -} + private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { +val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null) --- End diff -- I'll adjust the test too to catch this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21607#discussion_r197499100 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -97,48 +102,46 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } /** - * Called by the DAGScheduler when a stage starts. + * Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't + * yet been initialized. * * @param stage the stage id. * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart( - stage: StageId, - maxPartitionId: Int): Unit = { -val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) -java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) -synchronized { - authorizedCommittersByStage(stage) = arr -} + private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { +val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null) --- End diff -- Oh, right, d'oh. No need to backport that change, the logic is pretty simple. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/21607#discussion_r197449681 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -97,48 +102,46 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } /** - * Called by the DAGScheduler when a stage starts. + * Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't + * yet been initialized. * * @param stage the stage id. * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart( - stage: StageId, - maxPartitionId: Int): Unit = { -val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) -java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) -synchronized { - authorizedCommittersByStage(stage) = arr -} + private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { +val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null) --- End diff -- we are missing the logic here to handle multiple stage attempts to reuse I think the only diff is SPARK-19631, wonder if that is easy to pull back? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/21607#discussion_r197315441 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -97,48 +102,48 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } /** - * Called by the DAGScheduler when a stage starts. + * Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't + * yet been initialized. * * @param stage the stage id. * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart( - stage: StageId, - maxPartitionId: Int): Unit = { -val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) -java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) + private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { +val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null) synchronized { --- End diff -- we have 2 nested synchronized --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21607#discussion_r197300379 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(message) // We need to abort the task so that the driver can reschedule new attempts, if necessary committer.abortTask(mrTaskContext) - throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber) + throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber()) --- End diff -- 2.2 and later use the stage ID; I think it's more correct since that's what the coordinator uses, but don't really mind either way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21607#discussion_r197299435 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(message) // We need to abort the task so that the driver can reschedule new attempts, if necessary committer.abortTask(mrTaskContext) - throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber) + throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber()) --- End diff -- hmm, shall we still keep `jobId` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/21607 branch-2.1: backport SPARK-24589 and SPARK-22897 *Please do not merge this PR directly.* I'm posting this to speed up testing and review. These should go in as two separate commits. I'll address any test failures and review feedback here, but will push the changes separately when everything is in good shape. See individual commits for each individual change. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-24589-2.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21607.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21607 commit e5ccac21db69a5698e70d8fb993296fa854de132 Author: Xianjin YE Date: 2018-01-02T15:30:38Z [SPARK-22897][CORE] Expose stageAttemptId in TaskContext stageAttemptId added in TaskContext and corresponding construction modification Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE Closes #20082 from advancedxy/SPARK-22897. (cherry picked from commit a6fc300e91273230e7134ac6db95ccb4436c6f8f) Signed-off-by: Marcelo Vanzin commit 034fe94d544d5235eac70e38c7891c7110dbb569 Author: Marcelo Vanzin Date: 2018-06-21T18:25:15Z [SPARK-24589][CORE] Correctly identify tasks in output commit coordinator [branch-2.1]. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task is allowed to commit the output in the above case. The stage state in the coordinator is also maintained across stage retries, so that a stray speculative task from a previous stage attempt is not allowed to commit. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. Author: Marcelo Vanzin Closes #21577 from vanzin/SPARK-24552. (cherry picked from commit c8e909cd498b67b121fa920ceee7631c652dac38) Signed-off-by: Thomas Graves (cherry picked from commit 751b008204a847e26d79b1996ce3f3dbe96a5acf) Signed-off-by: Marcelo Vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org