Repository: spark
Updated Branches:
  refs/heads/master 7d7a0a5e0 -> 2413fce9d


[SPARK-15743][SQL] Prevent saving with all-column partitioning

## What changes were proposed in this pull request?

When saving datasets on storage, `partitionBy` provides an easy way to 
construct the directory structure. However, if a user choose all columns as 
partition columns, some exceptions occurs.

- **ORC with all column partitioning**: `AnalysisException` on **future read** 
due to schema inference failure.
 ```scala
scala> 
spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

scala> spark.read.format("orc").load("/tmp/data").collect()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at 
/tmp/data. It must be specified manually;
```

- **Parquet with all-column partitioning**: `InvalidSchemaException` on **write 
execution** due to Parquet limitation.
 ```scala
scala> 
spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data")
[Stage 0:>                                                          (0 + 8) / 
8]16/06/02 16:51:17
ERROR Utils: Aborting task
org.apache.parquet.schema.InvalidSchemaException: A group type can not be 
empty. Parquet does not support empty group without leaves. Empty group: 
spark_schema
... (lots of error messages)
```

Although some formats like JSON support all-column partitioning without any 
problem, it seems not a good idea to make lots of empty directories.

This PR prevents saving with all-column partitioning by consistently raising 
`AnalysisException` before executing save operation.

## How was this patch tested?

Newly added `PartitioningUtilsSuite`.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13486 from dongjoon-hyun/SPARK-15743.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2413fce9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2413fce9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2413fce9

Branch: refs/heads/master
Commit: 2413fce9d6812a91eeffb4435c2b5b361d23214b
Parents: 7d7a0a5
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Fri Jun 10 12:43:27 2016 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Fri Jun 10 12:43:27 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 32 ++++++++++----------
 .../datasources/PartitioningUtils.scala         |  8 +++--
 .../spark/sql/execution/datasources/rules.scala |  4 +--
 .../execution/streaming/FileStreamSink.scala    |  2 +-
 .../test/DataFrameReaderWriterSuite.scala       | 12 ++++++++
 5 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5f17fdf..d327302 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.spark.sql.execution.datasources
 
@@ -432,7 +432,7 @@ case class DataSource(
         }
 
         val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           data.schema, partitionColumns, caseSensitive)
 
         // If we are appending to a table that already exists, make sure the 
partitioning matches

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 74f2993..2340ff0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -339,7 +339,7 @@ private[sql] object PartitioningUtils {
   private val upCastingOrder: Seq[DataType] =
     Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
 
-  def validatePartitionColumnDataTypes(
+  def validatePartitionColumn(
       schema: StructType,
       partitionColumns: Seq[String],
       caseSensitive: Boolean): Unit = {
@@ -350,6 +350,10 @@ private[sql] object PartitioningUtils {
         case _ => throw new AnalysisException(s"Cannot use ${field.dataType} 
for partition column")
       }
     }
+
+    if (partitionColumns.size == schema.fields.size) {
+      throw new AnalysisException(s"Cannot use all columns for partition 
columns")
+    }
   }
 
   def partitionColumnsSchema(
@@ -359,7 +363,7 @@ private[sql] object PartitioningUtils {
     val equality = columnNameEquality(caseSensitive)
     StructType(partitionColumns.map { col =>
       schema.find(f => equality(f.name, col)).getOrElse {
-        throw new RuntimeException(s"Partition column $col not found in schema 
$schema")
+        throw new AnalysisException(s"Partition column $col not found in 
schema $schema")
       }
     }).asNullable
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 9afd715..7ac62fb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
 
         // Get all input data source relations of the query.
@@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
 
         for {

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index e191010..efb0491 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -91,7 +91,7 @@ class FileStreamSinkWriter(
     hadoopConf: Configuration,
     options: Map[String, String]) extends Serializable with Logging {
 
-  PartitioningUtils.validatePartitionColumnDataTypes(
+  PartitioningUtils.validatePartitionColumn(
     data.schema, partitionColumnNames, 
data.sqlContext.conf.caseSensitiveAnalysis)
 
   private val serializableConf = new SerializableConfiguration(hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/2413fce9/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 431a943..bf6063a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -572,4 +572,16 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
     cq.awaitTermination(2000L)
   }
+
+  test("prevent all column partitioning") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      intercept[AnalysisException] {
+        
spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
+      }
+      intercept[AnalysisException] {
+        
spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to