This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f89cdec [SPARK-26435][SQL] Support creating partitioned table using Hive CTAS by specifying partition column names f89cdec is described below commit f89cdec8b9a9fcc95ba7458869b4ba9d038560f9 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Thu Dec 27 16:03:14 2018 +0800 [SPARK-26435][SQL] Support creating partitioned table using Hive CTAS by specifying partition column names ## What changes were proposed in this pull request? Spark SQL doesn't support creating partitioned table using Hive CTAS in SQL syntax. However it is supported by using DataFrameWriter API. ```scala val df = Seq(("a", 1)).toDF("part", "id") df.write.format("hive").partitionBy("part").saveAsTable("t") ``` Hive begins to support this syntax in newer version: https://issues.apache.org/jira/browse/HIVE-20241: ``` CREATE TABLE t PARTITIONED BY (part) AS SELECT 1 as id, "a" as part ``` This patch adds this support to SQL syntax. ## How was this patch tested? Added tests. Closes #23376 from viirya/hive-ctas-partitioned-table. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../spark/sql/execution/SparkSqlParser.scala | 33 +++++++++------ .../spark/sql/hive/execution/HiveDDLSuite.scala | 48 +++++++++++++++++++++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 4 +- 4 files changed, 71 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 5e732ed..b39681d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -88,7 +88,8 @@ statement (AS? query)? #createTable | createTableHeader ('(' columns=colTypeList ')')? ((COMMENT comment=STRING) | - (PARTITIONED BY '(' partitionColumns=colTypeList ')') | + (PARTITIONED BY '(' partitionColumns=colTypeList ')' | + PARTITIONED BY partitionColumnNames=identifierList) | bucketSpec | skewSpec | rowFormat | diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 364efea..8deb55b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1196,33 +1196,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { selectQuery match { case Some(q) => - // Hive does not allow to use a CTAS statement to create a partitioned table. - if (tableDesc.partitionColumnNames.nonEmpty) { - val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Hive's file formats. " + - "Please use the syntax of \"CREATE TABLE tableName USING dataSource " + - "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " + - "CTAS statement." - operationNotAllowed(errorMessage, ctx) - } - // Don't allow explicit specification of schema for CTAS. - if (schema.nonEmpty) { + if (dataCols.nonEmpty) { operationNotAllowed( "Schema may not be specified in a Create Table As Select (CTAS) statement", ctx) } + // When creating partitioned table with CTAS statement, we can't specify data type for the + // partition columns. + if (partitionCols.nonEmpty) { + val errorMessage = "Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table." + operationNotAllowed(errorMessage, ctx) + } + + // Hive CTAS supports dynamic partition by specifying partition column names. + val partitionColumnNames = + Option(ctx.partitionColumnNames) + .map(visitIdentifierList(_).toArray) + .getOrElse(Array.empty[String]) + + val tableDescWithPartitionColNames = + tableDesc.copy(partitionColumnNames = partitionColumnNames) + val hasStorageProperties = (ctx.createFileFormat.size != 0) || (ctx.rowFormat.size != 0) if (conf.convertCTAS && !hasStorageProperties) { // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties // are empty Maps. - val newTableDesc = tableDesc.copy( + val newTableDesc = tableDescWithPartitionColNames.copy( storage = CatalogStorageFormat.empty.copy(locationUri = locUri), provider = Some(conf.defaultDataSourceName)) CreateTable(newTableDesc, mode, Some(q)) } else { - CreateTable(tableDesc, mode, Some(q)) + CreateTable(tableDescWithPartitionColNames, mode, Some(q)) } case None => CreateTable(tableDesc, mode, None) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index fd38944..6abdc40 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.net.URI -import java.util.Date import scala.language.existentials @@ -33,6 +32,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog @@ -2370,4 +2370,50 @@ class HiveDDLSuite )) } } + + test("Hive CTAS can't create partitioned table by specifying schema") { + val err1 = intercept[ParseException] { + spark.sql( + s""" + |CREATE TABLE t (a int) + |PARTITIONED BY (b string) + |STORED AS parquet + |AS SELECT 1 as a, "a" as b + """.stripMargin) + }.getMessage + assert(err1.contains("Schema may not be specified in a Create Table As Select " + + "(CTAS) statement")) + + val err2 = intercept[ParseException] { + spark.sql( + s""" + |CREATE TABLE t + |PARTITIONED BY (b string) + |STORED AS parquet + |AS SELECT 1 as a, "a" as b + """.stripMargin) + }.getMessage + assert(err2.contains("Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table")) + } + + test("Hive CTAS with dynamic partition") { + Seq("orc", "parquet").foreach { format => + withTable("t") { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + spark.sql( + s""" + |CREATE TABLE t + |PARTITIONED BY (b) + |STORED AS $format + |AS SELECT 1 as a, "a" as b + """.stripMargin) + checkAnswer(spark.table("t"), Row(1, "a")) + + assert(spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + .partitionColumnNames === Seq("b")) + } + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6acf446..70efad1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -692,8 +692,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |AS SELECT key, value FROM mytable1 """.stripMargin) }.getMessage - assert(e.contains("A Create Table As Select (CTAS) statement is not allowed to " + - "create a partitioned table using Hive's file formats")) + assert(e.contains("Create Partitioned Table As Select cannot specify data type for " + + "the partition columns of the target table")) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org