Repository: spark Updated Branches: refs/heads/master 5a5bbc299 -> afa757c98
[SPARK-9849] [SQL] DirectParquetOutputCommitter qualified name should be backward compatible DirectParquetOutputCommitter was moved in SPARK-9763. However, users can explicitly set the class as a config option, so we must be able to resolve the old committer qualified name. Author: Reynold Xin <r...@databricks.com> Closes #8114 from rxin/SPARK-9849. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afa757c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afa757c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afa757c9 Branch: refs/heads/master Commit: afa757c98c537965007cad4c61c436887f3ac6a6 Parents: 5a5bbc2 Author: Reynold Xin <r...@databricks.com> Authored: Tue Aug 11 18:08:49 2015 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Tue Aug 11 18:08:49 2015 -0700 ---------------------------------------------------------------------- .../datasources/parquet/ParquetRelation.scala | 7 +++++ .../datasources/parquet/ParquetIOSuite.scala | 27 +++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/afa757c9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 4086a13..c71c69b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -209,6 +209,13 @@ private[sql] class ParquetRelation( override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = ContextUtil.getConfiguration(job) + // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible + val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) + if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") { + conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, + classOf[DirectParquetOutputCommitter].getCanonicalName) + } + val committerClass = conf.getClass( SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, http://git-wip-us.apache.org/repos/asf/spark/blob/afa757c9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index ee925af..cb16634 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -390,7 +390,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest { } } - test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") { + test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") { + val clonedConf = new Configuration(configuration) + + // Write to a parquet file and let it fail. + // _temporary should be missing if direct output committer works. + try { + configuration.set("spark.sql.parquet.output.committer.class", + "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") + sqlContext.udf.register("div0", (x: Int) => x / 0) + withTempPath { dir => + intercept[org.apache.spark.SparkException] { + sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath) + } + val path = new Path(dir.getCanonicalPath, "_temporary") + val fs = path.getFileSystem(configuration) + assert(!fs.exists(path)) + } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + } + } + + + test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { withTempPath { dir => val clonedConf = new Configuration(configuration) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org