Repository: spark
Updated Branches:
  refs/heads/master 30e980ad8 -> 3e29e372f


[SPARK-14468] Always enable OutputCommitCoordinator

## What changes were proposed in this pull request?

`OutputCommitCoordinator` was introduced to deal with concurrent task attempts 
racing to write output, leading to data loss or corruption. For more detail, 
read the [JIRA description](https://issues.apache.org/jira/browse/SPARK-14468).

Before: `OutputCommitCoordinator` is enabled only if speculation is enabled.
After: `OutputCommitCoordinator` is always enabled.

Users may still disable this through 
`spark.hadoop.outputCommitCoordination.enabled`, but they really shouldn't...

## How was this patch tested?

`OutputCommitCoordinator*Suite`

Author: Andrew Or <and...@databricks.com>

Closes #12244 from andrewor14/always-occ.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e29e372
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e29e372
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e29e372

Branch: refs/heads/master
Commit: 3e29e372ff518827bae9dcd26087946fde476843
Parents: 30e980a
Author: Andrew Or <and...@databricks.com>
Authored: Thu Apr 7 17:49:39 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Apr 7 17:49:39 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/mapred/SparkHadoopMapRedUtil.scala | 16 ++++++----------
 .../OutputCommitCoordinatorIntegrationSuite.scala   |  2 +-
 .../scheduler/OutputCommitCoordinatorSuite.scala    |  2 +-
 3 files changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e29e372/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 891facb..607283a 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -33,11 +33,8 @@ object SparkHadoopMapRedUtil extends Logging {
    * the driver in order to determine whether this attempt can commit (please 
see SPARK-4879 for
    * details).
    *
-   * Output commit coordinator is only contacted when the following two 
configurations are both set
-   * to `true`:
-   *
-   *  - `spark.speculation`
-   *  - `spark.hadoop.outputCommitCoordination.enabled`
+   * Output commit coordinator is only used when 
`spark.hadoop.outputCommitCoordination.enabled`
+   * is set to true (which is the default).
    */
   def commitTask(
       committer: MapReduceOutputCommitter,
@@ -64,11 +61,10 @@ object SparkHadoopMapRedUtil extends Logging {
     if (committer.needsTaskCommit(mrTaskContext)) {
       val shouldCoordinateWithDriver: Boolean = {
         val sparkConf = SparkEnv.get.conf
-        // We only need to coordinate with the driver if there are multiple 
concurrent task
-        // attempts, which should only occur if speculation is enabled
-        val speculationEnabled = sparkConf.getBoolean("spark.speculation", 
defaultValue = false)
-        // This (undocumented) setting is an escape-hatch in case the commit 
code introduces bugs
-        sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", 
speculationEnabled)
+        // We only need to coordinate with the driver if there are concurrent 
task attempts.
+        // Note that this could happen even when speculation is not enabled 
(e.g. see SPARK-8029).
+        // This (undocumented) setting is an escape-hatch in case the commit 
code introduces bugs.
+        sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", 
defaultValue = true)
       }
 
       if (shouldCoordinateWithDriver) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3e29e372/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
index 9f41aca..601f1c3 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -38,7 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite
     super.beforeAll()
     val conf = new SparkConf()
       .set("master", "local[2,4]")
-      .set("spark.speculation", "true")
+      .set("spark.hadoop.outputCommitCoordination.enabled", "true")
       .set("spark.hadoop.mapred.output.committer.class",
         classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
     sc = new SparkContext("local[2, 4]", "test", conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/3e29e372/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index c461da6..8e509de 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -77,7 +77,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     val conf = new SparkConf()
       .setMaster("local[4]")
       .setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
-      .set("spark.speculation", "true")
+      .set("spark.hadoop.outputCommitCoordination.enabled", "true")
     sc = new SparkContext(conf) {
       override private[spark] def createSparkEnv(
           conf: SparkConf,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to