Repository: spark
Updated Branches:
  refs/heads/master 5db35b312 -> c71b25481


[SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API

## What changes were proposed in this pull request?

Currently in SQL we implement overwrites by calling fs.delete() directly on the 
original data. This is not ideal since we the original files end up deleted 
even if the job aborts. We should extend the commit protocol to allow file 
overwrites to be managed as well.

## How was this patch tested?

Existing tests. I also fixed a bunch of tests that were depending on the commit 
protocol implementation being set to the legacy mapreduce one.

cc rxin cloud-fan

Author: Eric Liang <e...@databricks.com>
Author: Eric Liang <ekhli...@gmail.com>

Closes #16554 from ericl/add-delete-protocol.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c71b2548
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c71b2548
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c71b2548

Branch: refs/heads/master
Commit: c71b25481aa5f7bc27d5c979e66bed54cd46b97e
Parents: 5db35b3
Author: Eric Liang <e...@databricks.com>
Authored: Thu Jan 12 17:45:55 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Jan 12 17:45:55 2017 +0800

----------------------------------------------------------------------
 .../spark/internal/io/FileCommitProtocol.scala  |   9 ++
 .../InsertIntoHadoopFsRelationCommand.scala     |  25 ++--
 .../datasources/HadoopFsRelationSuite.scala     |   2 +-
 .../datasources/parquet/ParquetIOSuite.scala    | 122 ++++++++++---------
 .../ParquetPartitionDiscoverySuite.scala        |   9 +-
 .../datasources/parquet/ParquetQuerySuite.scala |   5 +
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala |   4 +-
 .../sql/sources/HadoopFsRelationTest.scala      |  77 ++++++------
 .../sources/ParquetHadoopFsRelationSuite.scala  |   6 +-
 9 files changed, 149 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index afd2250..2394cf3 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.internal.io
 
+import org.apache.hadoop.fs._
 import org.apache.hadoop.mapreduce._
 
 import org.apache.spark.util.Utils
@@ -112,6 +113,14 @@ abstract class FileCommitProtocol {
    * just crashes (or killed) before it can call abort.
    */
   def abortTask(taskContext: TaskAttemptContext): Unit
+
+  /**
+   * Specifies that a file should be deleted with the commit of this job. The 
default
+   * implementation deletes the file immediately.
+   */
+  def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = 
{
+    fs.delete(path, recursive)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 423009e..652bcc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -88,11 +88,20 @@ case class InsertIntoHadoopFsRelationCommand(
     }
 
     val pathExists = fs.exists(qualifiedOutputPath)
+    // If we are appending data to an existing dir.
+    val isAppend = pathExists && (mode == SaveMode.Append)
+
+    val committer = FileCommitProtocol.instantiate(
+      sparkSession.sessionState.conf.fileCommitProtocolClass,
+      jobId = java.util.UUID.randomUUID().toString,
+      outputPath = outputPath.toString,
+      isAppend = isAppend)
+
     val doInsertion = (mode, pathExists) match {
       case (SaveMode.ErrorIfExists, true) =>
         throw new AnalysisException(s"path $qualifiedOutputPath already 
exists.")
       case (SaveMode.Overwrite, true) =>
-        deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations)
+        deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations, committer)
         true
       case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | 
(SaveMode.ErrorIfExists, false) =>
         true
@@ -101,15 +110,8 @@ case class InsertIntoHadoopFsRelationCommand(
       case (s, exists) =>
         throw new IllegalStateException(s"unsupported save mode $s ($exists)")
     }
-    // If we are appending data to an existing dir.
-    val isAppend = pathExists && (mode == SaveMode.Append)
 
     if (doInsertion) {
-      val committer = FileCommitProtocol.instantiate(
-        sparkSession.sessionState.conf.fileCommitProtocolClass,
-        jobId = java.util.UUID.randomUUID().toString,
-        outputPath = outputPath.toString,
-        isAppend = isAppend)
 
       // Callback for updating metastore partition metadata after the 
insertion job completes.
       def refreshPartitionsCallback(updatedPartitions: 
Seq[TablePartitionSpec]): Unit = {
@@ -160,7 +162,8 @@ case class InsertIntoHadoopFsRelationCommand(
   private def deleteMatchingPartitions(
       fs: FileSystem,
       qualifiedOutputPath: Path,
-      customPartitionLocations: Map[TablePartitionSpec, String]): Unit = {
+      customPartitionLocations: Map[TablePartitionSpec, String],
+      committer: FileCommitProtocol): Unit = {
     val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
       "/" + partitionColumns.flatMap { p =>
         staticPartitions.get(p.name) match {
@@ -175,7 +178,7 @@ case class InsertIntoHadoopFsRelationCommand(
     }
     // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
     val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
-    if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* 
recursively */)) {
+    if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
       throw new IOException(s"Unable to clear output " +
         s"directory $staticPrefixPath prior to writing to it")
     }
@@ -185,7 +188,7 @@ case class InsertIntoHadoopFsRelationCommand(
         (staticPartitions.toSet -- spec).isEmpty,
         "Custom partition location did not match static partitioning keys")
       val path = new Path(customLoc)
-      if (fs.exists(path) && !fs.delete(path, true)) {
+      if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
         throw new IOException(s"Unable to clear partition " +
           s"directory $path prior to writing to it")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index 7679e85..becb3aa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -31,7 +31,7 @@ class HadoopFsRelationSuite extends QueryTest with 
SharedSQLContext {
       // ignore hidden files
       val allFiles = dir.listFiles(new FilenameFilter {
         override def accept(dir: File, name: String): Boolean = {
-          !name.startsWith(".")
+          !name.startsWith(".") && !name.startsWith("_")
         }
       })
       val totalSize = allFiles.map(_.length()).sum

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index acdadb3..dbdcd23 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -462,16 +463,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
   }
 
   test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be 
overridden") {
-    val extraOptions = Map(
-      SQLConf.OUTPUT_COMMITTER_CLASS.key -> 
classOf[ParquetOutputCommitter].getCanonicalName,
-      "spark.sql.parquet.output.committer.class" ->
-        classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
-    )
-    withTempPath { dir =>
-      val message = intercept[SparkException] {
-        spark.range(0, 
1).write.options(extraOptions).parquet(dir.getCanonicalPath)
-      }.getCause.getMessage
-      assert(message === "Intentional exception for testing purposes")
+    withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+        classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
+      val extraOptions = Map(
+        SQLConf.OUTPUT_COMMITTER_CLASS.key -> 
classOf[ParquetOutputCommitter].getCanonicalName,
+        "spark.sql.parquet.output.committer.class" ->
+          classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
+      )
+      withTempPath { dir =>
+        val message = intercept[SparkException] {
+          spark.range(0, 
1).write.options(extraOptions).parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(message === "Intentional exception for testing purposes")
+      }
     }
   }
 
@@ -488,58 +492,64 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
   }
 
   test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
-    // Using a output committer that always fail when committing a task, so 
that both
-    // `commitTask()` and `abortTask()` are invoked.
-    val extraOptions = Map[String, String](
-      "spark.sql.parquet.output.committer.class" ->
-        classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
-    )
+    withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+        classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
+      // Using a output committer that always fail when committing a task, so 
that both
+      // `commitTask()` and `abortTask()` are invoked.
+      val extraOptions = Map[String, String](
+        "spark.sql.parquet.output.committer.class" ->
+          classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
+      )
+
+      // Before fixing SPARK-7837, the following code results in an NPE 
because both
+      // `commitTask()` and `abortTask()` try to close output writers.
 
-    // Before fixing SPARK-7837, the following code results in an NPE because 
both
-    // `commitTask()` and `abortTask()` try to close output writers.
-
-    withTempPath { dir =>
-      val m1 = intercept[SparkException] {
-        
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
-      }.getCause.getMessage
-      assert(m1.contains("Intentional exception for testing purposes"))
-    }
+      withTempPath { dir =>
+        val m1 = intercept[SparkException] {
+          
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(m1.contains("Intentional exception for testing purposes"))
+      }
 
-    withTempPath { dir =>
-      val m2 = intercept[SparkException] {
-        val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1)
-        
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
-      }.getCause.getMessage
-      assert(m2.contains("Intentional exception for testing purposes"))
+      withTempPath { dir =>
+        val m2 = intercept[SparkException] {
+          val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1)
+          
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(m2.contains("Intentional exception for testing purposes"))
+      }
     }
   }
 
   test("SPARK-11044 Parquet writer version fixed as version1 ") {
-    // For dictionary encoding, Parquet changes the encoding types according 
to its writer
-    // version. So, this test checks one of the encoding types in order to 
ensure that
-    // the file is written with writer version2.
-    val extraOptions = Map[String, String](
-      // Write a Parquet file with writer version2.
-      ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString,
-      // By default, dictionary encoding is enabled from Parquet 1.2.0 but
-      // it is enabled just in case.
-      ParquetOutputFormat.ENABLE_DICTIONARY -> "true"
-    )
-
-    val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
-
-    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
-      withTempPath { dir =>
-        val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
-        spark.range(1 << 16).selectExpr("(id % 4) AS i")
-          
.coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
-
-        val blockMetadata = readFooter(new Path(path), 
hadoopConf).getBlocks.asScala.head
-        val columnChunkMetadata = blockMetadata.getColumns.asScala.head
-
-        // If the file is written with version2, this should include
-        // Encoding.RLE_DICTIONARY type. For version1, it is 
Encoding.PLAIN_DICTIONARY
-        
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+    withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+        classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
+      // For dictionary encoding, Parquet changes the encoding types according 
to its writer
+      // version. So, this test checks one of the encoding types in order to 
ensure that
+      // the file is written with writer version2.
+      val extraOptions = Map[String, String](
+        // Write a Parquet file with writer version2.
+        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString,
+        // By default, dictionary encoding is enabled from Parquet 1.2.0 but
+        // it is enabled just in case.
+        ParquetOutputFormat.ENABLE_DICTIONARY -> "true"
+      )
+
+      val hadoopConf = 
spark.sessionState.newHadoopConfWithOptions(extraOptions)
+
+      withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+        withTempPath { dir =>
+          val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+          spark.range(1 << 16).selectExpr("(id % 4) AS i")
+            
.coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
+
+          val blockMetadata = readFooter(new Path(path), 
hadoopConf).getBlocks.asScala.head
+          val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+
+          // If the file is written with version2, this should include
+          // Encoding.RLE_DICTIONARY type. For version1, it is 
Encoding.PLAIN_DICTIONARY
+          
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f433a74..420cff8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -455,7 +455,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       assert(partDf.schema.map(_.name) === Seq("intField", "stringField"))
 
       path.listFiles().foreach { f =>
-        if (f.getName.toLowerCase().endsWith(".parquet")) {
+        if (!f.getName.startsWith("_") && 
f.getName.toLowerCase().endsWith(".parquet")) {
           // when the input is a path to a parquet file
           val df = spark.read.parquet(f.getCanonicalPath)
           assert(df.schema.map(_.name) === Seq("intField", "stringField"))
@@ -463,7 +463,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       }
 
       path.listFiles().foreach { f =>
-        if (f.getName.toLowerCase().endsWith(".parquet")) {
+        if (!f.getName.startsWith("_") && 
f.getName.toLowerCase().endsWith(".parquet")) {
           // when the input is a path to a parquet file but `basePath` is 
overridden to
           // the base path containing partitioning directories
           val df = spark
@@ -932,7 +932,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
     withTempPath { dir =>
       val path = dir.getCanonicalPath
 
-      withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+      withSQLConf(
+          ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true",
+          "spark.sql.sources.commitProtocolClass" ->
+            classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
         spark.range(3).write.parquet(s"$path/p0=0/p1=0")
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 4c4a7d8..ee7f2d0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.FileSourceScanExec
+import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import 
org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, 
NestedStructUDT, SingleElement}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -178,6 +179,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
     }
 
     withSQLConf(
+      SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+        classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName,
       SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
       SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true",
       ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true"
@@ -186,6 +189,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
     }
 
     withSQLConf(
+      SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+        classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName,
       SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
       SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false"
     ) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index e678cf6..4f771ca 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -93,7 +93,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
         .orc(path)
 
       // Check if this is compressed as ZLIB.
-      val maybeOrcFile = new 
File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
+      val maybeOrcFile = new File(path).listFiles().find { f =>
+        !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc")
+      }
       assert(maybeOrcFile.isDefined)
       val orcFilePath = maybeOrcFile.get.toPath.toString
       val expectedCompressionKind =

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 2446bed..d23b66a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.DataSourceScanExec
-import org.apache.spark.sql.execution.datasources.{FileScanRDD, 
HadoopFsRelation, LocalityTestFileSystem, LogicalRelation}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -784,44 +784,47 @@ abstract class HadoopFsRelationTest extends QueryTest 
with SQLTestUtils with Tes
   }
 
   test("SPARK-8578 specified custom output committer will not be used to 
append data") {
-    val extraOptions = Map[String, String](
-      SQLConf.OUTPUT_COMMITTER_CLASS.key -> 
classOf[AlwaysFailOutputCommitter].getName,
-      // Since Parquet has its own output committer setting, also set it
-      // to AlwaysFailParquetOutputCommitter at here.
-      "spark.sql.parquet.output.committer.class" ->
-        classOf[AlwaysFailParquetOutputCommitter].getName
-    )
+    withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+        classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
+      val extraOptions = Map[String, String](
+        SQLConf.OUTPUT_COMMITTER_CLASS.key -> 
classOf[AlwaysFailOutputCommitter].getName,
+        // Since Parquet has its own output committer setting, also set it
+        // to AlwaysFailParquetOutputCommitter at here.
+        "spark.sql.parquet.output.committer.class" ->
+          classOf[AlwaysFailParquetOutputCommitter].getName
+      )
 
-    val df = spark.range(1, 10).toDF("i")
-    withTempPath { dir =>
-      df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
-      // Because there data already exists,
-      // this append should succeed because we will use the output committer 
associated
-      // with file format and AlwaysFailOutputCommitter will not be used.
-      df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
-      checkAnswer(
-        spark.read
-          .format(dataSourceName)
-          .option("dataSchema", df.schema.json)
-          .options(extraOptions)
-          .load(dir.getCanonicalPath),
-        df.union(df))
-
-      // This will fail because AlwaysFailOutputCommitter is used when we do 
append.
-      intercept[Exception] {
-        df.write.mode("overwrite")
-          
.options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
+      val df = spark.range(1, 10).toDF("i")
+      withTempPath { dir =>
+        
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        // Because there data already exists,
+        // this append should succeed because we will use the output committer 
associated
+        // with file format and AlwaysFailOutputCommitter will not be used.
+        
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+        checkAnswer(
+          spark.read
+            .format(dataSourceName)
+            .option("dataSchema", df.schema.json)
+            .options(extraOptions)
+            .load(dir.getCanonicalPath),
+          df.union(df))
+
+        // This will fail because AlwaysFailOutputCommitter is used when we do 
append.
+        intercept[Exception] {
+          df.write.mode("overwrite")
+            
.options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
+        }
       }
-    }
-    withTempPath { dir =>
-      // Because there is no existing data,
-      // this append will fail because AlwaysFailOutputCommitter is used when 
we do append
-      // and there is no existing data.
-      intercept[Exception] {
-        df.write.mode("append")
-          .options(extraOptions)
-          .format(dataSourceName)
-          .save(dir.getCanonicalPath)
+      withTempPath { dir =>
+        // Because there is no existing data,
+        // this append will fail because AlwaysFailOutputCommitter is used 
when we do append
+        // and there is no existing data.
+        intercept[Exception] {
+          df.write.mode("append")
+            .options(extraOptions)
+            .format(dataSourceName)
+            .save(dir.getCanonicalPath)
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 8aa018d..03207ab 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
+import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -125,7 +126,10 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
   }
 
   test("SPARK-8604: Parquet data source should write summary file while doing 
appending") {
-    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+    withSQLConf(
+        ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true",
+        SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+          classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
       withTempPath { dir =>
         val path = dir.getCanonicalPath
         val df = spark.range(0, 5).toDF()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to