Repository: spark
Updated Branches:
  refs/heads/master ec29f2034 -> 6c4fdbec3


[SPARK-8887] [SQL] Explicit define which data types can be used as dynamic 
partition columns

This PR enforce dynamic partition column data type requirements by adding 
analysis rules.

JIRA: https://issues.apache.org/jira/browse/SPARK-8887

Author: Yijie Shen <[email protected]>

Closes #8201 from yjshen/dynamic_partition_columns.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c4fdbec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c4fdbec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c4fdbec

Branch: refs/heads/master
Commit: 6c4fdbec33af287d24cd0995ecbd7191545d05c9
Parents: ec29f20
Author: Yijie Shen <[email protected]>
Authored: Fri Aug 14 21:03:14 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Fri Aug 14 21:03:14 2015 -0700

----------------------------------------------------------------------
 .../execution/datasources/PartitioningUtils.scala  | 13 +++++++++++++
 .../execution/datasources/ResolvedDataSource.scala |  5 ++++-
 .../execution/datasources/WriterContainer.scala    |  2 +-
 .../spark/sql/execution/datasources/rules.scala    |  8 ++++++--
 .../spark/sql/sources/hadoopFsRelationSuites.scala | 17 +++++++++++++++++
 5 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 66dfcc3..0a2007e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -26,6 +26,7 @@ import scala.util.Try
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.util.Shell
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types._
@@ -270,6 +271,18 @@ private[sql] object PartitioningUtils {
   private val upCastingOrder: Seq[DataType] =
     Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
 
+  def validatePartitionColumnDataTypes(
+      schema: StructType,
+      partitionColumns: Array[String]): Unit = {
+
+    ResolvedDataSource.partitionColumnsSchema(schema, 
partitionColumns).foreach { field =>
+      field.dataType match {
+        case _: AtomicType => // OK
+        case _ => throw new AnalysisException(s"Cannot use ${field.dataType} 
for partition column")
+      }
+    }
+  }
+
   /**
    * Given a collection of [[Literal]]s, resolves possible type conflicts by 
up-casting "lower"
    * types.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 7770bbd..8fbaf3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging {
     new ResolvedDataSource(clazz, relation)
   }
 
-  private def partitionColumnsSchema(
+  def partitionColumnsSchema(
       schema: StructType,
       partitionColumns: Array[String]): StructType = {
     StructType(partitionColumns.map { col =>
@@ -179,6 +179,9 @@ object ResolvedDataSource extends Logging {
           val fs = 
path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
         }
+
+        PartitioningUtils.validatePartitionColumnDataTypes(data.schema, 
partitionColumns)
+
         val dataSchema = StructType(data.schema.filterNot(f => 
partitionColumns.contains(f.name)))
         val r = dataSource.createRelation(
           sqlContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 2f11f40..d36197e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer(
           PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, 
StringType)), Seq(StringType))
       val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
       val partitionName = Literal(c.name + "=") :: str :: Nil
-      if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: 
partitionName
+      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
     }
 
     // Returns the partition path given a partition key.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 40ca8bf..9d3d356 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -116,6 +116,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
           // OK
         }
 
+        PartitioningUtils.validatePartitionColumnDataTypes(r.schema, 
part.keySet.toArray)
+
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
           case LogicalRelation(src: BaseRelation) => src
@@ -138,10 +140,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
           // OK
         }
 
-      case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, 
query) =>
+      case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, 
_, query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an 
input table of
         // the query. If so, we will throw an AnalysisException to let users 
know it is not allowed.
-        if (catalog.tableExists(Seq(tableName))) {
+        if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) 
{
           // Need to remove SubQuery operator.
           EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
             // Only do the check if the table is a data source table
@@ -164,6 +166,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
           // OK
         }
 
+        PartitioningUtils.validatePartitionColumnDataTypes(query.schema, 
partitionColumns)
+
       case _ => // OK
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c4fdbec/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index af44562..8d0d921 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources
 
+import java.sql.Date
+
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
@@ -553,6 +555,21 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils {
       clonedConf.foreach(entry => configuration.set(entry.getKey, 
entry.getValue))
     }
   }
+
+  test("SPARK-8887: Explicitly define which data types can be used as dynamic 
partition columns") {
+    val df = Seq(
+      (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
+      (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")),
+      (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", 
"b", "c", "d", "e")
+    withTempDir { file =>
+      intercept[AnalysisException] {
+        df.write.format(dataSourceName).partitionBy("c", "d", 
"e").save(file.getCanonicalPath)
+      }
+    }
+    intercept[AnalysisException] {
+      df.write.format(dataSourceName).partitionBy("c", "d", 
"e").saveAsTable("t")
+    }
+  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output 
committer when


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to