Repository: spark
Updated Branches:
  refs/heads/master 5a5bbc299 -> afa757c98


[SPARK-9849] [SQL] DirectParquetOutputCommitter qualified name should be 
backward compatible

DirectParquetOutputCommitter was moved in SPARK-9763. However, users can 
explicitly set the class as a config option, so we must be able to resolve the 
old committer qualified name.

Author: Reynold Xin <r...@databricks.com>

Closes #8114 from rxin/SPARK-9849.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afa757c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afa757c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afa757c9

Branch: refs/heads/master
Commit: afa757c98c537965007cad4c61c436887f3ac6a6
Parents: 5a5bbc2
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Aug 11 18:08:49 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Aug 11 18:08:49 2015 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetRelation.scala   |  7 +++++
 .../datasources/parquet/ParquetIOSuite.scala    | 27 +++++++++++++++++++-
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/afa757c9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 4086a13..c71c69b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -209,6 +209,13 @@ private[sql] class ParquetRelation(
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
     val conf = ContextUtil.getConfiguration(job)
 
+    // SPARK-9849 DirectParquetOutputCommitter qualified name should be 
backward compatible
+    val committerClassname = 
conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
+    if (committerClassname == 
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
+      conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
+        classOf[DirectParquetOutputCommitter].getCanonicalName)
+    }
+
     val committerClass =
       conf.getClass(
         SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,

http://git-wip-us.apache.org/repos/asf/spark/blob/afa757c9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index ee925af..cb16634 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -390,7 +390,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
     }
   }
 
-  test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be 
overriden") {
+  test("SPARK-9849 DirectParquetOutputCommitter qualified name should be 
backward compatible") {
+    val clonedConf = new Configuration(configuration)
+
+    // Write to a parquet file and let it fail.
+    // _temporary should be missing if direct output committer works.
+    try {
+      configuration.set("spark.sql.parquet.output.committer.class",
+        "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
+      sqlContext.udf.register("div0", (x: Int) => x / 0)
+      withTempPath { dir =>
+        intercept[org.apache.spark.SparkException] {
+          sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
+        }
+        val path = new Path(dir.getCanonicalPath, "_temporary")
+        val fs = path.getFileSystem(configuration)
+        assert(!fs.exists(path))
+      }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, 
entry.getValue))
+    }
+  }
+
+
+  test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be 
overridden") {
     withTempPath { dir =>
       val clonedConf = new Configuration(configuration)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to