[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-22 Thread vanzin
Github user vanzin closed the pull request at:

https://github.com/apache/spark/pull/21607


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-22 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21607#discussion_r197508752
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -153,25 +163,26 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
 
   // Marked private[scheduler] instead of private so this can be mocked in 
tests
   private[scheduler] def handleAskPermissionToCommit(
-  stage: StageId,
-  partition: PartitionId,
-  attemptNumber: TaskAttemptNumber): Boolean = synchronized {
+  stage: Int,
+  stageAttempt: Int,
+  partition: Int,
+  attemptNumber: Int): Boolean = synchronized {
 authorizedCommittersByStage.get(stage) match {
   case Some(authorizedCommitters) =>
-authorizedCommitters(partition) match {
-  case NO_AUTHORIZED_COMMITTER =>
-logDebug(s"Authorizing attemptNumber=$attemptNumber to commit 
for stage=$stage, " +
-  s"partition=$partition")
-authorizedCommitters(partition) = attemptNumber
-true
-  case existingCommitter =>
-logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
-  s"partition=$partition; existingCommitter = 
$existingCommitter")
-false
+val existing = authorizedCommitters(partition)
+if (existing == null) {
+  logDebug(s"Authorizing attemptNumber=$attemptNumber to commit 
for stage=$stage, " +
--- End diff --

$stage.$stageAttempt



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21607#discussion_r197499240
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -97,48 +102,46 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   }
 
   /**
-   * Called by the DAGScheduler when a stage starts.
+   * Called by the DAGScheduler when a stage starts. Initializes the 
stage's state if it hasn't
+   * yet been initialized.
*
* @param stage the stage id.
* @param maxPartitionId the maximum partition id that could appear in 
this stage's tasks (i.e.
*   the maximum possible value of 
`context.partitionId`).
*/
-  private[scheduler] def stageStart(
-  stage: StageId,
-  maxPartitionId: Int): Unit = {
-val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
-java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
-synchronized {
-  authorizedCommittersByStage(stage) = arr
-}
+  private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit 
= synchronized {
+val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null)
--- End diff --

I'll adjust the test too to catch this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21607#discussion_r197499100
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -97,48 +102,46 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   }
 
   /**
-   * Called by the DAGScheduler when a stage starts.
+   * Called by the DAGScheduler when a stage starts. Initializes the 
stage's state if it hasn't
+   * yet been initialized.
*
* @param stage the stage id.
* @param maxPartitionId the maximum partition id that could appear in 
this stage's tasks (i.e.
*   the maximum possible value of 
`context.partitionId`).
*/
-  private[scheduler] def stageStart(
-  stage: StageId,
-  maxPartitionId: Int): Unit = {
-val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
-java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
-synchronized {
-  authorizedCommittersByStage(stage) = arr
-}
+  private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit 
= synchronized {
+val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null)
--- End diff --

Oh, right, d'oh. No need to backport that change, the logic is pretty 
simple.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-22 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21607#discussion_r197449681
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -97,48 +102,46 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   }
 
   /**
-   * Called by the DAGScheduler when a stage starts.
+   * Called by the DAGScheduler when a stage starts. Initializes the 
stage's state if it hasn't
+   * yet been initialized.
*
* @param stage the stage id.
* @param maxPartitionId the maximum partition id that could appear in 
this stage's tasks (i.e.
*   the maximum possible value of 
`context.partitionId`).
*/
-  private[scheduler] def stageStart(
-  stage: StageId,
-  maxPartitionId: Int): Unit = {
-val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
-java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
-synchronized {
-  authorizedCommittersByStage(stage) = arr
-}
+  private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit 
= synchronized {
+val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null)
--- End diff --

we are missing the logic here to handle multiple stage attempts to reuse 
I think the only diff is SPARK-19631, wonder if that is easy to pull back?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-21 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21607#discussion_r197315441
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -97,48 +102,48 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   }
 
   /**
-   * Called by the DAGScheduler when a stage starts.
+   * Called by the DAGScheduler when a stage starts. Initializes the 
stage's state if it hasn't
+   * yet been initialized.
*
* @param stage the stage id.
* @param maxPartitionId the maximum partition id that could appear in 
this stage's tasks (i.e.
*   the maximum possible value of 
`context.partitionId`).
*/
-  private[scheduler] def stageStart(
-  stage: StageId,
-  maxPartitionId: Int): Unit = {
-val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
-java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
+  private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit 
= synchronized {
+val arr = Array.fill[TaskIdentifier](maxPartitionId + 1)(null)
 synchronized {
--- End diff --

we have 2 nested synchronized


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-21 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21607#discussion_r197300379
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
   logInfo(message)
   // We need to abort the task so that the driver can reschedule 
new attempts, if necessary
   committer.abortTask(mrTaskContext)
-  throw new CommitDeniedException(message, jobId, splitId, 
taskAttemptNumber)
+  throw new CommitDeniedException(message, ctx.stageId(), splitId, 
ctx.attemptNumber())
--- End diff --

2.2 and later use the stage ID; I think it's more correct since that's what 
the coordinator uses, but don't really mind either way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21607#discussion_r197299435
  
--- Diff: 
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---
@@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
   logInfo(message)
   // We need to abort the task so that the driver can reschedule 
new attempts, if necessary
   committer.abortTask(mrTaskContext)
-  throw new CommitDeniedException(message, jobId, splitId, 
taskAttemptNumber)
+  throw new CommitDeniedException(message, ctx.stageId(), splitId, 
ctx.attemptNumber())
--- End diff --

hmm, shall we still keep `jobId` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21607: branch-2.1: backport SPARK-24589 and SPARK-22897

2018-06-21 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/21607

branch-2.1: backport SPARK-24589 and SPARK-22897

*Please do not merge this PR directly.*

I'm posting this to speed up testing and review. These should go in as two 
separate commits. I'll address any test failures and review feedback here, but 
will push the changes separately when everything is in good shape.

See individual commits for each individual change.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-24589-2.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21607.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21607


commit e5ccac21db69a5698e70d8fb993296fa854de132
Author: Xianjin YE 
Date:   2018-01-02T15:30:38Z

[SPARK-22897][CORE] Expose stageAttemptId in TaskContext

stageAttemptId added in TaskContext and corresponding construction 
modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE 

Closes #20082 from advancedxy/SPARK-22897.

(cherry picked from commit a6fc300e91273230e7134ac6db95ccb4436c6f8f)
Signed-off-by: Marcelo Vanzin 

commit 034fe94d544d5235eac70e38c7891c7110dbb569
Author: Marcelo Vanzin 
Date:   2018-06-21T18:25:15Z

[SPARK-24589][CORE] Correctly identify tasks in output commit coordinator 
[branch-2.1].

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the 
coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not 
allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin 

Closes #21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909cd498b67b121fa920ceee7631c652dac38)
Signed-off-by: Thomas Graves 
(cherry picked from commit 751b008204a847e26d79b1996ce3f3dbe96a5acf)
Signed-off-by: Marcelo Vanzin 




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org