Repository: spark Updated Branches: refs/heads/master 5db35b312 -> c71b25481
[SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API ## What changes were proposed in this pull request? Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well. ## How was this patch tested? Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one. cc rxin cloud-fan Author: Eric Liang <e...@databricks.com> Author: Eric Liang <ekhli...@gmail.com> Closes #16554 from ericl/add-delete-protocol. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c71b2548 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c71b2548 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c71b2548 Branch: refs/heads/master Commit: c71b25481aa5f7bc27d5c979e66bed54cd46b97e Parents: 5db35b3 Author: Eric Liang <e...@databricks.com> Authored: Thu Jan 12 17:45:55 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Jan 12 17:45:55 2017 +0800 ---------------------------------------------------------------------- .../spark/internal/io/FileCommitProtocol.scala | 9 ++ .../InsertIntoHadoopFsRelationCommand.scala | 25 ++-- .../datasources/HadoopFsRelationSuite.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 122 ++++++++++--------- .../ParquetPartitionDiscoverySuite.scala | 9 +- .../datasources/parquet/ParquetQuerySuite.scala | 5 + .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 4 +- .../sql/sources/HadoopFsRelationTest.scala | 77 ++++++------ .../sources/ParquetHadoopFsRelationSuite.scala | 6 +- 9 files changed, 149 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index afd2250..2394cf3 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -17,6 +17,7 @@ package org.apache.spark.internal.io +import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ import org.apache.spark.util.Utils @@ -112,6 +113,14 @@ abstract class FileCommitProtocol { * just crashes (or killed) before it can call abort. */ def abortTask(taskContext: TaskAttemptContext): Unit + + /** + * Specifies that a file should be deleted with the commit of this job. The default + * implementation deletes the file immediately. + */ + def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { + fs.delete(path, recursive) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 423009e..652bcc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -88,11 +88,20 @@ case class InsertIntoHadoopFsRelationCommand( } val pathExists = fs.exists(qualifiedOutputPath) + // If we are appending data to an existing dir. + val isAppend = pathExists && (mode == SaveMode.Append) + + val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.fileCommitProtocolClass, + jobId = java.util.UUID.randomUUID().toString, + outputPath = outputPath.toString, + isAppend = isAppend) + val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations) + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) true case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => true @@ -101,15 +110,8 @@ case class InsertIntoHadoopFsRelationCommand( case (s, exists) => throw new IllegalStateException(s"unsupported save mode $s ($exists)") } - // If we are appending data to an existing dir. - val isAppend = pathExists && (mode == SaveMode.Append) if (doInsertion) { - val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, - outputPath = outputPath.toString, - isAppend = isAppend) // Callback for updating metastore partition metadata after the insertion job completes. def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { @@ -160,7 +162,8 @@ case class InsertIntoHadoopFsRelationCommand( private def deleteMatchingPartitions( fs: FileSystem, qualifiedOutputPath: Path, - customPartitionLocations: Map[TablePartitionSpec, String]): Unit = { + customPartitionLocations: Map[TablePartitionSpec, String], + committer: FileCommitProtocol): Unit = { val staticPartitionPrefix = if (staticPartitions.nonEmpty) { "/" + partitionColumns.flatMap { p => staticPartitions.get(p.name) match { @@ -175,7 +178,7 @@ case class InsertIntoHadoopFsRelationCommand( } // first clear the path determined by the static partition keys (e.g. /table/foo=1) val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) - if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) { + if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) { throw new IOException(s"Unable to clear output " + s"directory $staticPrefixPath prior to writing to it") } @@ -185,7 +188,7 @@ case class InsertIntoHadoopFsRelationCommand( (staticPartitions.toSet -- spec).isEmpty, "Custom partition location did not match static partitioning keys") val path = new Path(customLoc) - if (fs.exists(path) && !fs.delete(path, true)) { + if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) { throw new IOException(s"Unable to clear partition " + s"directory $path prior to writing to it") } http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala index 7679e85..becb3aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala @@ -31,7 +31,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { // ignore hidden files val allFiles = dir.listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { - !name.startsWith(".") + !name.startsWith(".") && !name.startsWith("_") } }) val totalSize = allFiles.map(_.length()).sum http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index acdadb3..dbdcd23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -462,16 +463,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { - val extraOptions = Map( - SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, - "spark.sql.parquet.output.committer.class" -> - classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName - ) - withTempPath { dir => - val message = intercept[SparkException] { - spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(message === "Intentional exception for testing purposes") + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + val extraOptions = Map( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, + "spark.sql.parquet.output.committer.class" -> + classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName + ) + withTempPath { dir => + val message = intercept[SparkException] { + spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(message === "Intentional exception for testing purposes") + } } } @@ -488,58 +492,64 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-7837 Do not close output writer twice when commitTask() fails") { - // Using a output committer that always fail when committing a task, so that both - // `commitTask()` and `abortTask()` are invoked. - val extraOptions = Map[String, String]( - "spark.sql.parquet.output.committer.class" -> - classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName - ) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // Using a output committer that always fail when committing a task, so that both + // `commitTask()` and `abortTask()` are invoked. + val extraOptions = Map[String, String]( + "spark.sql.parquet.output.committer.class" -> + classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName + ) + + // Before fixing SPARK-7837, the following code results in an NPE because both + // `commitTask()` and `abortTask()` try to close output writers. - // Before fixing SPARK-7837, the following code results in an NPE because both - // `commitTask()` and `abortTask()` try to close output writers. - - withTempPath { dir => - val m1 = intercept[SparkException] { - spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m1.contains("Intentional exception for testing purposes")) - } + withTempPath { dir => + val m1 = intercept[SparkException] { + spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m1.contains("Intentional exception for testing purposes")) + } - withTempPath { dir => - val m2 = intercept[SparkException] { - val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) - df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m2.contains("Intentional exception for testing purposes")) + withTempPath { dir => + val m2 = intercept[SparkException] { + val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) + df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m2.contains("Intentional exception for testing purposes")) + } } } test("SPARK-11044 Parquet writer version fixed as version1 ") { - // For dictionary encoding, Parquet changes the encoding types according to its writer - // version. So, this test checks one of the encoding types in order to ensure that - // the file is written with writer version2. - val extraOptions = Map[String, String]( - // Write a Parquet file with writer version2. - ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, - // By default, dictionary encoding is enabled from Parquet 1.2.0 but - // it is enabled just in case. - ParquetOutputFormat.ENABLE_DICTIONARY -> "true" - ) - - val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) - - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/part-r-0.parquet" - spark.range(1 << 16).selectExpr("(id % 4) AS i") - .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) - - val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head - val columnChunkMetadata = blockMetadata.getColumns.asScala.head - - // If the file is written with version2, this should include - // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY - assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // For dictionary encoding, Parquet changes the encoding types according to its writer + // version. So, this test checks one of the encoding types in order to ensure that + // the file is written with writer version2. + val extraOptions = Map[String, String]( + // Write a Parquet file with writer version2. + ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, + // By default, dictionary encoding is enabled from Parquet 1.2.0 but + // it is enabled just in case. + ParquetOutputFormat.ENABLE_DICTIONARY -> "true" + ) + + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) + + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part-r-0.parquet" + spark.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + + // If the file is written with version2, this should include + // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY + assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index f433a74..420cff8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -455,7 +455,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha assert(partDf.schema.map(_.name) === Seq("intField", "stringField")) path.listFiles().foreach { f => - if (f.getName.toLowerCase().endsWith(".parquet")) { + if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file val df = spark.read.parquet(f.getCanonicalPath) assert(df.schema.map(_.name) === Seq("intField", "stringField")) @@ -463,7 +463,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } path.listFiles().foreach { f => - if (f.getName.toLowerCase().endsWith(".parquet")) { + if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file but `basePath` is overridden to // the base path containing partitioning directories val df = spark @@ -932,7 +932,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha withTempPath { dir => val path = dir.getCanonicalPath - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withSQLConf( + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + "spark.sql.sources.commitProtocolClass" -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { spark.range(3).write.parquet(s"$path/p0=0/p1=0") } http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4c4a7d8..ee7f2d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -178,6 +179,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } withSQLConf( + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName, SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true", ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true" @@ -186,6 +189,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } withSQLConf( + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName, SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false" ) { http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index e678cf6..4f771ca 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -93,7 +93,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .orc(path) // Check if this is compressed as ZLIB. - val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) + val maybeOrcFile = new File(path).listFiles().find { f => + !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc") + } assert(maybeOrcFile.isDefined) val orcFilePath = maybeOrcFile.get.toPath.toString val expectedCompressionKind = http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 2446bed..d23b66a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -784,44 +784,47 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("SPARK-8578 specified custom output committer will not be used to append data") { - val extraOptions = Map[String, String]( - SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, - // Since Parquet has its own output committer setting, also set it - // to AlwaysFailParquetOutputCommitter at here. - "spark.sql.parquet.output.committer.class" -> - classOf[AlwaysFailParquetOutputCommitter].getName - ) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + val extraOptions = Map[String, String]( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, + // Since Parquet has its own output committer setting, also set it + // to AlwaysFailParquetOutputCommitter at here. + "spark.sql.parquet.output.committer.class" -> + classOf[AlwaysFailParquetOutputCommitter].getName + ) - val df = spark.range(1, 10).toDF("i") - withTempPath { dir => - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - // Because there data already exists, - // this append should succeed because we will use the output committer associated - // with file format and AlwaysFailOutputCommitter will not be used. - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - checkAnswer( - spark.read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .load(dir.getCanonicalPath), - df.union(df)) - - // This will fail because AlwaysFailOutputCommitter is used when we do append. - intercept[Exception] { - df.write.mode("overwrite") - .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + val df = spark.range(1, 10).toDF("i") + withTempPath { dir => + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + // Because there data already exists, + // this append should succeed because we will use the output committer associated + // with file format and AlwaysFailOutputCommitter will not be used. + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + spark.read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .load(dir.getCanonicalPath), + df.union(df)) + + // This will fail because AlwaysFailOutputCommitter is used when we do append. + intercept[Exception] { + df.write.mode("overwrite") + .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + } } - } - withTempPath { dir => - // Because there is no existing data, - // this append will fail because AlwaysFailOutputCommitter is used when we do append - // and there is no existing data. - intercept[Exception] { - df.write.mode("append") - .options(extraOptions) - .format(dataSourceName) - .save(dir.getCanonicalPath) + withTempPath { dir => + // Because there is no existing data, + // this append will fail because AlwaysFailOutputCommitter is used when we do append + // and there is no existing data. + intercept[Exception] { + df.write.mode("append") + .options(extraOptions) + .format(dataSourceName) + .save(dir.getCanonicalPath) + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c71b2548/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 8aa018d..03207ab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -125,7 +126,10 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-8604: Parquet data source should write summary file while doing appending") { - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withSQLConf( + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { withTempPath { dir => val path = dir.getCanonicalPath val df = spark.range(0, 5).toDF() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org