This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f89cdec  [SPARK-26435][SQL] Support creating partitioned table using 
Hive CTAS by specifying partition column names
f89cdec is described below

commit f89cdec8b9a9fcc95ba7458869b4ba9d038560f9
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Thu Dec 27 16:03:14 2018 +0800

    [SPARK-26435][SQL] Support creating partitioned table using Hive CTAS by 
specifying partition column names
    
    ## What changes were proposed in this pull request?
    
    Spark SQL doesn't support creating partitioned table using Hive CTAS in SQL 
syntax. However it is supported by using DataFrameWriter API.
    
    ```scala
    val df = Seq(("a", 1)).toDF("part", "id")
    df.write.format("hive").partitionBy("part").saveAsTable("t")
    ```
    Hive begins to support this syntax in newer version: 
https://issues.apache.org/jira/browse/HIVE-20241:
    
    ```
    CREATE TABLE t PARTITIONED BY (part) AS SELECT 1 as id, "a" as part
    ```
    
    This patch adds this support to SQL syntax.
    
    ## How was this patch tested?
    
    Added tests.
    
    Closes #23376 from viirya/hive-ctas-partitioned-table.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalyst/parser/SqlBase.g4    |  3 +-
 .../spark/sql/execution/SparkSqlParser.scala       | 33 +++++++++------
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 48 +++++++++++++++++++++-
 .../spark/sql/hive/execution/SQLQuerySuite.scala   |  4 +-
 4 files changed, 71 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 5e732ed..b39681d 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -88,7 +88,8 @@ statement
         (AS? query)?                                                   
#createTable
     | createTableHeader ('(' columns=colTypeList ')')?
         ((COMMENT comment=STRING) |
-        (PARTITIONED BY '(' partitionColumns=colTypeList ')') |
+        (PARTITIONED BY '(' partitionColumns=colTypeList ')' |
+        PARTITIONED BY partitionColumnNames=identifierList) |
         bucketSpec |
         skewSpec |
         rowFormat |
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 364efea..8deb55b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1196,33 +1196,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
 
     selectQuery match {
       case Some(q) =>
-        // Hive does not allow to use a CTAS statement to create a partitioned 
table.
-        if (tableDesc.partitionColumnNames.nonEmpty) {
-          val errorMessage = "A Create Table As Select (CTAS) statement is not 
allowed to " +
-            "create a partitioned table using Hive's file formats. " +
-            "Please use the syntax of \"CREATE TABLE tableName USING 
dataSource " +
-            "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table 
through a " +
-            "CTAS statement."
-          operationNotAllowed(errorMessage, ctx)
-        }
-
         // Don't allow explicit specification of schema for CTAS.
-        if (schema.nonEmpty) {
+        if (dataCols.nonEmpty) {
           operationNotAllowed(
             "Schema may not be specified in a Create Table As Select (CTAS) 
statement",
             ctx)
         }
 
+        // When creating partitioned table with CTAS statement, we can't 
specify data type for the
+        // partition columns.
+        if (partitionCols.nonEmpty) {
+          val errorMessage = "Create Partitioned Table As Select cannot 
specify data type for " +
+            "the partition columns of the target table."
+          operationNotAllowed(errorMessage, ctx)
+        }
+
+        // Hive CTAS supports dynamic partition by specifying partition column 
names.
+        val partitionColumnNames =
+          Option(ctx.partitionColumnNames)
+            .map(visitIdentifierList(_).toArray)
+            .getOrElse(Array.empty[String])
+
+        val tableDescWithPartitionColNames =
+          tableDesc.copy(partitionColumnNames = partitionColumnNames)
+
         val hasStorageProperties = (ctx.createFileFormat.size != 0) || 
(ctx.rowFormat.size != 0)
         if (conf.convertCTAS && !hasStorageProperties) {
           // At here, both rowStorage.serdeProperties and 
fileStorage.serdeProperties
           // are empty Maps.
-          val newTableDesc = tableDesc.copy(
+          val newTableDesc = tableDescWithPartitionColNames.copy(
             storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
             provider = Some(conf.defaultDataSourceName))
           CreateTable(newTableDesc, mode, Some(q))
         } else {
-          CreateTable(tableDesc, mode, Some(q))
+          CreateTable(tableDescWithPartitionColNames, mode, Some(q))
         }
       case None => CreateTable(tableDesc, mode, None)
     }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fd38944..6abdc40 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
 
 import java.io.File
 import java.net.URI
-import java.util.Date
 
 import scala.language.existentials
 
@@ -33,6 +32,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, 
Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -2370,4 +2370,50 @@ class HiveDDLSuite
       ))
     }
   }
+
+  test("Hive CTAS can't create partitioned table by specifying schema") {
+    val err1 = intercept[ParseException] {
+      spark.sql(
+        s"""
+           |CREATE TABLE t (a int)
+           |PARTITIONED BY (b string)
+           |STORED AS parquet
+           |AS SELECT 1 as a, "a" as b
+                 """.stripMargin)
+    }.getMessage
+    assert(err1.contains("Schema may not be specified in a Create Table As 
Select " +
+      "(CTAS) statement"))
+
+    val err2 = intercept[ParseException] {
+      spark.sql(
+        s"""
+           |CREATE TABLE t
+           |PARTITIONED BY (b string)
+           |STORED AS parquet
+           |AS SELECT 1 as a, "a" as b
+                 """.stripMargin)
+    }.getMessage
+    assert(err2.contains("Create Partitioned Table As Select cannot specify 
data type for " +
+      "the partition columns of the target table"))
+  }
+
+  test("Hive CTAS with dynamic partition") {
+    Seq("orc", "parquet").foreach { format =>
+      withTable("t") {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          spark.sql(
+            s"""
+               |CREATE TABLE t
+               |PARTITIONED BY (b)
+               |STORED AS $format
+               |AS SELECT 1 as a, "a" as b
+               """.stripMargin)
+          checkAnswer(spark.table("t"), Row(1, "a"))
+
+          
assert(spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+            .partitionColumnNames === Seq("b"))
+        }
+      }
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6acf446..70efad1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -692,8 +692,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
               |AS SELECT key, value FROM mytable1
             """.stripMargin)
         }.getMessage
-        assert(e.contains("A Create Table As Select (CTAS) statement is not 
allowed to " +
-          "create a partitioned table using Hive's file formats"))
+        assert(e.contains("Create Partitioned Table As Select cannot specify 
data type for " +
+          "the partition columns of the target table"))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to