Repository: spark Updated Branches: refs/heads/branch-1.6 8a94a59f9 -> 77ebae367
[SPARK-14468] Always enable OutputCommitCoordinator ## What changes were proposed in this pull request? `OutputCommitCoordinator` was introduced to deal with concurrent task attempts racing to write output, leading to data loss or corruption. For more detail, read the [JIRA description](https://issues.apache.org/jira/browse/SPARK-14468). Before: `OutputCommitCoordinator` is enabled only if speculation is enabled. After: `OutputCommitCoordinator` is always enabled. Users may still disable this through `spark.hadoop.outputCommitCoordination.enabled`, but they really shouldn't... ## How was this patch tested? `OutputCommitCoordinator*Suite` Author: Andrew Or <and...@databricks.com> Closes #12244 from andrewor14/always-occ. (cherry picked from commit 3e29e372ff518827bae9dcd26087946fde476843) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77ebae36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77ebae36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77ebae36 Branch: refs/heads/branch-1.6 Commit: 77ebae36779b39dd9f9feafafaeb36910c90b8cb Parents: 8a94a59 Author: Andrew Or <and...@databricks.com> Authored: Thu Apr 7 17:49:39 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu Apr 7 17:49:50 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/mapred/SparkHadoopMapRedUtil.scala | 16 ++++++---------- .../OutputCommitCoordinatorIntegrationSuite.scala | 2 +- .../scheduler/OutputCommitCoordinatorSuite.scala | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/77ebae36/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index f7298e8..aa8397c 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -81,11 +81,8 @@ object SparkHadoopMapRedUtil extends Logging { * the driver in order to determine whether this attempt can commit (please see SPARK-4879 for * details). * - * Output commit coordinator is only contacted when the following two configurations are both set - * to `true`: - * - * - `spark.speculation` - * - `spark.hadoop.outputCommitCoordination.enabled` + * Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled` + * is set to true (which is the default). */ def commitTask( committer: MapReduceOutputCommitter, @@ -112,11 +109,10 @@ object SparkHadoopMapRedUtil extends Logging { if (committer.needsTaskCommit(mrTaskContext)) { val shouldCoordinateWithDriver: Boolean = { val sparkConf = SparkEnv.get.conf - // We only need to coordinate with the driver if there are multiple concurrent task - // attempts, which should only occur if speculation is enabled - val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false) - // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs - sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled) + // We only need to coordinate with the driver if there are concurrent task attempts. + // Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029). + // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs. + sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true) } if (shouldCoordinateWithDriver) { http://git-wip-us.apache.org/repos/asf/spark/blob/77ebae36/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index 1ae5b03..e0eeeb8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -38,7 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite super.beforeAll() val conf = new SparkConf() .set("master", "local[2,4]") - .set("spark.speculation", "true") + .set("spark.hadoop.outputCommitCoordination.enabled", "true") .set("spark.hadoop.mapred.output.committer.class", classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName) sc = new SparkContext("local[2, 4]", "test", conf) http://git-wip-us.apache.org/repos/asf/spark/blob/77ebae36/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 7345508..bbf8874 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -78,7 +78,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val conf = new SparkConf() .setMaster("local[4]") .setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName) - .set("spark.speculation", "true") + .set("spark.hadoop.outputCommitCoordination.enabled", "true") sc = new SparkContext(conf) { override private[spark] def createSparkEnv( conf: SparkConf, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org