Repository: spark
Updated Branches:
  refs/heads/master c8fb776d4 -> 6dddb70c3


[SPARK-15646][SQL] When spark.sql.hive.convertCTAS is true, the conversion rule 
needs to respect TEXTFILE/SEQUENCEFILE format and the user-defined location

## What changes were proposed in this pull request?
When `spark.sql.hive.convertCTAS` is true, for a CTAS statement, we will create 
a data source table using the default source (i.e. parquet) if the CTAS does 
not specify any Hive storage format. However, there are two issues with this 
conversion logic.
1. First, we determine if a CTAS statement defines storage format by checking 
the serde. However, TEXTFILE/SEQUENCEFILE does not have a default serde. When 
we do the check, we have not set the default serde. So, a query like `CREATE 
TABLE abc STORED AS TEXTFILE AS SELECT ...` actually creates a data source 
parquet table.
2. In the conversion logic, we are ignoring the user-specified location.

This PR fixes the above two issues.

Also, this PR makes the parser throws an exception when a CTAS statement has a 
PARTITIONED BY clause. This change is made because Hive's syntax does not allow 
it and our current implementation actually does not work for this case (the 
insert operation always throws an exception because the insertion does not pick 
up the partitioning info).

## How was this patch tested?
I am adding new tests in SQLQuerySuite and HiveDDLCommandSuite.

Author: Yin Huai <yh...@databricks.com>

Closes #13386 from yhuai/SPARK-14507.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dddb70c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dddb70c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dddb70c

Branch: refs/heads/master
Commit: 6dddb70c387ed1f002d2602b2b1f919ef021de91
Parents: c8fb776
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Jun 1 17:55:37 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Jun 1 17:55:37 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkSqlParser.scala    |  37 ++++-
 .../spark/sql/execution/command/tables.scala    |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  10 ++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  57 ++------
 .../spark/sql/hive/HiveSessionState.scala       |  16 ---
 .../org/apache/spark/sql/hive/HiveUtils.scala   |   6 -
 .../CreateHiveTableAsSelectCommand.scala        | 102 ++++++++++++++
 .../execution/CreateTableAsSelectCommand.scala  | 101 --------------
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  25 ++--
 .../sql/hive/execution/HiveExplainSuite.scala   |   6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 135 ++++++++++++++-----
 11 files changed, 273 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 6c19bf0..01409c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -839,7 +839,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
   /**
    * Create a table, returning either a [[CreateTableCommand]] or a
-   * [[CreateTableAsSelectLogicalPlan]].
+   * [[CreateHiveTableAsSelectLogicalPlan]].
    *
    * This is not used to create datasource tables, which is handled through
    * "CREATE TABLE ... USING ...".
@@ -936,7 +936,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
       comment = comment)
 
     selectQuery match {
-      case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+      case Some(q) =>
+        // Hive does not allow to use a CTAS statement to create a partitioned 
table.
+        if (tableDesc.partitionColumnNames.nonEmpty) {
+          val errorMessage = "A Create Table As Select (CTAS) statement is not 
allowed to " +
+            "create a partitioned table using Hive's file formats. " +
+            "Please use the syntax of \"CREATE TABLE tableName USING 
dataSource " +
+            "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table 
through a " +
+            "CTAS statement."
+          throw operationNotAllowed(errorMessage, ctx)
+        }
+
+        val hasStorageProperties = (ctx.createFileFormat != null) || 
(ctx.rowFormat != null)
+        if (conf.convertCTAS && !hasStorageProperties) {
+          val mode = if (ifNotExists) SaveMode.Ignore else 
SaveMode.ErrorIfExists
+          // At here, both rowStorage.serdeProperties and 
fileStorage.serdeProperties
+          // are empty Maps.
+          val optionsWithPath = if (location.isDefined) {
+            Map("path" -> location.get)
+          } else {
+            Map.empty[String, String]
+          }
+          CreateTableUsingAsSelect(
+            tableIdent = tableDesc.identifier,
+            provider = conf.defaultDataSourceName,
+            temporary = false,
+            partitionColumns = tableDesc.partitionColumnNames.toArray,
+            bucketSpec = None,
+            mode = mode,
+            options = optionsWithPath,
+            q
+          )
+        } else {
+          CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+        }
       case None => CreateTableCommand(tableDesc, ifNotExists)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 1b89c6b..90db785 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -39,7 +39,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-case class CreateTableAsSelectLogicalPlan(
+case class CreateHiveTableAsSelectLogicalPlan(
     tableDesc: CatalogTable,
     child: LogicalPlan,
     allowExisting: Boolean) extends UnaryNode with Command {

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d1db0dd..437e093 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -310,6 +310,14 @@ object SQLConf {
     .stringConf
     .createWithDefault("parquet")
 
+  val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+    .internal()
+    .doc("When true, a table created by a Hive CTAS statement (no USING 
clause) " +
+      "without specifying any storage property will be converted to a data 
source table, " +
+      "using the data source set by spark.sql.sources.default.")
+    .booleanConf
+    .createWithDefault(false)
+
   // This is used to control the when we will split a schema's JSON string to 
multiple pieces
   // in order to fit the JSON string in metastore's table property (by 
default, the value has
   // a length restriction of 4000 characters). We will split the JSON string 
of a schema
@@ -632,6 +640,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
 
+  def convertCTAS: Boolean = getConf(CONVERT_CTAS)
+
   def partitionDiscoveryEnabled(): Boolean =
     getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ff395f3..f10afa7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
+import 
org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -446,53 +446,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
 
-      case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
-        val schema = if (table.schema.nonEmpty) {
-          table.schema
+      case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) 
=>
+        val desc = if (table.storage.serde.isEmpty) {
+          // add default serde
+          table.withNewStorage(
+            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
         } else {
-          child.output.map { a =>
-            CatalogColumn(a.name, a.dataType.catalogString, a.nullable)
-          }
+          table
         }
 
-        val desc = table.copy(schema = schema)
-
-        if (sessionState.convertCTAS && table.storage.serde.isEmpty) {
-          // Do the conversion when spark.sql.hive.convertCTAS is true and the 
query
-          // does not specify any storage format (file format and storage 
handler).
-          if (table.identifier.database.isDefined) {
-            throw new AnalysisException(
-              "Cannot specify database name in a CTAS statement " +
-                "when spark.sql.hive.convertCTAS is set to true.")
-          }
+        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
-          val mode = if (allowExisting) SaveMode.Ignore else 
SaveMode.ErrorIfExists
-          CreateTableUsingAsSelect(
-            TableIdentifier(desc.identifier.table),
-            sessionState.conf.defaultDataSourceName,
-            temporary = false,
-            Array.empty[String],
-            bucketSpec = None,
-            mode,
-            options = Map.empty[String, String],
-            child
-          )
-        } else {
-          val desc = if (table.storage.serde.isEmpty) {
-            // add default serde
-            table.withNewStorage(
-              serde = 
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
-          } else {
-            table
-          }
-
-          val QualifiedTableName(dbName, tblName) = 
getQualifiedTableName(table)
-
-          execution.CreateTableAsSelectCommand(
-            desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-            child,
-            allowExisting)
-        }
+        execution.CreateHiveTableAsSelectCommand(
+          desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
+          child,
+          allowExisting)
     }
   }
 
@@ -543,6 +511,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 /**
  * An override of the standard HDFS listing based catalog, that overrides the 
partition spec with
  * the information from the metastore.
+ *
  * @param tableBasePath The default base path of the Hive metastore table
  * @param partitionSpec The partition specifications from Hive metastore
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 081d85a..ca8e5f8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -139,22 +139,6 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
   }
 
   /**
-   * When true, a table created by a Hive CTAS statement (no USING clause) 
will be
-   * converted to a data source table, using the data source set by 
spark.sql.sources.default.
-   * The table in CTAS statement will be converted when it meets any of the 
following conditions:
-   *   - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File 
Format (STORED AS), or
-   *     a Storage Handler (STORED BY), and the value of 
hive.default.fileformat in hive-site.xml
-   *     is either TextFile or SequenceFile.
-   *   - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the 
file format and no SerDe
-   *     is specified (no ROW FORMAT SERDE clause).
-   *   - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as 
the file format
-   *     and no SerDe is specified (no ROW FORMAT SERDE clause).
-   */
-  def convertCTAS: Boolean = {
-    conf.getConf(HiveUtils.CONVERT_CTAS)
-  }
-
-  /**
    * When true, Hive Thrift server will execute SQL queries asynchronously 
using a thread pool."
    */
   def hiveThriftServerAsync: Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 88f4a2d..9ed357c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -96,12 +96,6 @@ private[spark] object HiveUtils extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
-    .doc("When true, a table created by a Hive CTAS statement (no USING 
clause) will be " +
-      "converted to a data source table, using the data source set by 
spark.sql.sources.default.")
-    .booleanConf
-    .createWithDefault(false)
-
   val CONVERT_METASTORE_ORC = 
SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
     .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables 
instead of " +
       "the built in support.")

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
new file mode 100644
index 0000000..b809938
--- /dev/null
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hive.MetastoreRelation
+
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the Table Describe, which may contains serde, storage 
handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param ignoreIfExists allow continue working if it's already exists, 
otherwise
+ *                      raise exception
+ */
+private[hive]
+case class CreateHiveTableAsSelectCommand(
+    tableDesc: CatalogTable,
+    query: LogicalPlan,
+    ignoreIfExists: Boolean)
+  extends RunnableCommand {
+
+  private val tableIdentifier = tableDesc.identifier
+
+  override def children: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    lazy val metastoreRelation: MetastoreRelation = {
+      import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+      import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+      import org.apache.hadoop.io.Text
+      import org.apache.hadoop.mapred.TextInputFormat
+
+      val withFormat =
+        tableDesc.withNewStorage(
+          inputFormat =
+            
tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
+          outputFormat =
+            tableDesc.storage.outputFormat
+              .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, 
Text]].getName)),
+          serde = 
tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
+          compressed = tableDesc.storage.compressed)
+
+      val withSchema = if (withFormat.schema.isEmpty) {
+        // Hive doesn't support specifying the column list for target table in 
CTAS
+        // However we don't think SparkSQL should follow that.
+        tableDesc.copy(schema = query.output.map { c =>
+          CatalogColumn(c.name, c.dataType.catalogString)
+        })
+      } else {
+        withFormat
+      }
+
+      sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists 
= false)
+
+      // Get the Metastore Relation
+      sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
+        case r: MetastoreRelation => r
+      }
+    }
+    // TODO ideally, we should get the output data ready first and then
+    // add the relation into catalog, just in case of failure occurs while data
+    // processing.
+    if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
+      if (ignoreIfExists) {
+        // table already exists, will do nothing, to keep consistent with Hive
+      } else {
+        throw new AnalysisException(s"$tableIdentifier already exists.")
+      }
+    } else {
+      sparkSession.sessionState.executePlan(InsertIntoTable(
+        metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
+    }
+
+    Seq.empty[Row]
+  }
+
+  override def argString: String = {
+    s"[Database:${tableDesc.database}}, " +
+    s"TableName: ${tableDesc.identifier.table}, " +
+    s"InsertIntoHiveTable]"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
deleted file mode 100644
index cfe6149..0000000
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.MetastoreRelation
-
-
-/**
- * Create table and insert the query result into it.
- * @param tableDesc the Table Describe, which may contains serde, storage 
handler etc.
- * @param query the query whose result will be insert into the new relation
- * @param allowExisting allow continue working if it's already exists, 
otherwise
- *                      raise exception
- */
-private[hive]
-case class CreateTableAsSelectCommand(
-    tableDesc: CatalogTable,
-    query: LogicalPlan,
-    allowExisting: Boolean)
-  extends RunnableCommand {
-
-  private val tableIdentifier = tableDesc.identifier
-
-  override def children: Seq[LogicalPlan] = Seq(query)
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    lazy val metastoreRelation: MetastoreRelation = {
-      import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-      import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-      import org.apache.hadoop.io.Text
-      import org.apache.hadoop.mapred.TextInputFormat
-
-      val withFormat =
-        tableDesc.withNewStorage(
-          inputFormat =
-            
tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
-          outputFormat =
-            tableDesc.storage.outputFormat
-              .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, 
Text]].getName)),
-          serde = 
tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
-          compressed = tableDesc.storage.compressed)
-
-      val withSchema = if (withFormat.schema.isEmpty) {
-        // Hive doesn't support specifying the column list for target table in 
CTAS
-        // However we don't think SparkSQL should follow that.
-        tableDesc.copy(schema = query.output.map { c =>
-          CatalogColumn(c.name, c.dataType.catalogString)
-        })
-      } else {
-        withFormat
-      }
-
-      sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists 
= false)
-
-      // Get the Metastore Relation
-      sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-        case r: MetastoreRelation => r
-      }
-    }
-    // TODO ideally, we should get the output data ready first and then
-    // add the relation into catalog, just in case of failure occurs while data
-    // processing.
-    if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
-      if (allowExisting) {
-        // table already exists, will do nothing, to keep consistent with Hive
-      } else {
-        throw new AnalysisException(s"$tableIdentifier already exists.")
-      }
-    } else {
-      sparkSession.sessionState.executePlan(InsertIntoTable(
-        metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
-    }
-
-    Seq.empty[Row]
-  }
-
-  override def argString: String = {
-    s"[Database:${tableDesc.database}}, " +
-    s"TableName: ${tableDesc.identifier.table}, " +
-    s"InsertIntoHiveTable]"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 3297a6f..ba9fe54 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -36,7 +36,7 @@ class HiveDDLCommandSuite extends PlanTest {
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
       case c: CreateTableCommand => (c.table, c.ifNotExists)
-      case c: CreateTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
+      case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, 
c.allowExisting)
       case c: CreateViewCommand => (c.tableDesc, c.allowExisting)
     }.head
   }
@@ -58,7 +58,6 @@ class HiveDDLCommandSuite extends PlanTest {
         |ip STRING COMMENT 'IP Address of the User',
         |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
-        |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 
'hour of the day')
         |STORED AS RCFILE
         |LOCATION '/user/external/page_view'
         |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
@@ -76,16 +75,12 @@ class HiveDDLCommandSuite extends PlanTest {
       CatalogColumn("page_url", "string") ::
       CatalogColumn("referrer_url", "string") ::
       CatalogColumn("ip", "string", comment = Some("IP Address of the User")) 
::
-      CatalogColumn("country", "string", comment = Some("country of 
origination")) ::
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: 
Nil)
+      CatalogColumn("country", "string", comment = Some("country of 
origination")) :: Nil)
     assert(desc.comment == Some("This is the staging page view table"))
     // TODO will be SQLText
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
-    assert(desc.partitionColumns ==
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: 
Nil)
+    assert(desc.partitionColumns == Seq.empty[CatalogColumn])
     assert(desc.storage.inputFormat == 
Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
     assert(desc.storage.outputFormat == 
Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
     assert(desc.storage.serde ==
@@ -103,7 +98,6 @@ class HiveDDLCommandSuite extends PlanTest {
         |ip STRING COMMENT 'IP Address of the User',
         |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
-        |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 
'hour of the day')
         |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
         | STORED AS
         | INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
@@ -124,16 +118,12 @@ class HiveDDLCommandSuite extends PlanTest {
       CatalogColumn("page_url", "string") ::
       CatalogColumn("referrer_url", "string") ::
       CatalogColumn("ip", "string", comment = Some("IP Address of the User")) 
::
-      CatalogColumn("country", "string", comment = Some("country of 
origination")) ::
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: 
Nil)
+      CatalogColumn("country", "string", comment = Some("country of 
origination")) :: Nil)
     // TODO will be SQLText
     assert(desc.comment == Some("This is the staging page view table"))
     assert(desc.viewText.isEmpty)
     assert(desc.viewOriginalText.isEmpty)
-    assert(desc.partitionColumns ==
-      CatalogColumn("dt", "string", comment = Some("date type")) ::
-      CatalogColumn("hour", "string", comment = Some("hour of the day")) :: 
Nil)
+    assert(desc.partitionColumns == Seq.empty[CatalogColumn])
     assert(desc.storage.serdeProperties == Map())
     assert(desc.storage.inputFormat == 
Some("parquet.hive.DeprecatedParquetInputFormat"))
     assert(desc.storage.outputFormat == 
Some("parquet.hive.DeprecatedParquetOutputFormat"))
@@ -195,6 +185,11 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22")))
   }
 
+  test("CTAS statement with a PARTITIONED BY clause is not allowed") {
+    assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" +
+      " AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp")
+  }
+
   test("unsupported operations") {
     intercept[ParseException] {
       parser.parsePlan(

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 0d08f7e..a43eed9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -53,7 +53,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
       "== Analyzed Logical Plan ==",
       "== Optimized Logical Plan ==",
       "== Physical Plan ==",
-      "CreateTableAsSelect",
+      "CreateHiveTableAsSelect",
       "InsertIntoHiveTable",
       "Limit",
       "src")
@@ -71,7 +71,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
       "== Analyzed Logical Plan ==",
       "== Optimized Logical Plan ==",
       "== Physical Plan ==",
-      "CreateTableAsSelect",
+      "CreateHiveTableAsSelect",
       "InsertIntoHiveTable",
       "Limit",
       "src")
@@ -92,7 +92,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
       val shouldContain =
         "== Parsed Logical Plan ==" :: "== Analyzed Logical Plan ==" :: 
"Subquery" ::
         "== Optimized Logical Plan ==" :: "== Physical Plan ==" ::
-        "CreateTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
+        "CreateHiveTableAsSelect" :: "InsertIntoHiveTable" :: "jt" :: Nil
       for (key <- shouldContain) {
         assert(outputs.contains(key), s"$key doesn't exist in result")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/6dddb70c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 2a9b06b..b569145 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,11 +24,14 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -376,78 +379,138 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     )
   }
 
-  test("CTAS without serde") {
-    def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = 
{
-      val relation = EliminateSubqueryAliases(
-        sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
-      relation match {
-        case LogicalRelation(r: HadoopFsRelation, _, _) =>
-          if (!isDataSourceParquet) {
-            fail(
-              s"${classOf[MetastoreRelation].getCanonicalName} is expected, 
but found " +
+  def checkRelation(
+      tableName: String,
+      isDataSourceParquet: Boolean,
+      format: String,
+      userSpecifiedLocation: Option[String] = None): Unit = {
+    val relation = EliminateSubqueryAliases(
+      sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
+    val catalogTable =
+      sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+    relation match {
+      case LogicalRelation(r: HadoopFsRelation, _, _) =>
+        if (!isDataSourceParquet) {
+          fail(
+            s"${classOf[MetastoreRelation].getCanonicalName} is expected, but 
found " +
               s"${HadoopFsRelation.getClass.getCanonicalName}.")
-          }
+        }
+        userSpecifiedLocation match {
+          case Some(location) =>
+            assert(r.options("path") === location)
+          case None => // OK.
+        }
+        assert(
+          
catalogTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) === 
format)
 
-        case r: MetastoreRelation =>
-          if (isDataSourceParquet) {
-            fail(
-              s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but 
found " +
+      case r: MetastoreRelation =>
+        if (isDataSourceParquet) {
+          fail(
+            s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but 
found " +
               s"${classOf[MetastoreRelation].getCanonicalName}.")
-          }
-      }
+        }
+        userSpecifiedLocation match {
+          case Some(location) =>
+            assert(r.catalogTable.storage.locationUri.get === location)
+          case None => // OK.
+        }
+        // Also make sure that the format is the desired format.
+        
assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format))
     }
 
-    val originalConf = sessionState.convertCTAS
+    // When a user-specified location is defined, the table type needs to be 
EXTERNAL.
+    val actualTableType = catalogTable.tableType
+    userSpecifiedLocation match {
+      case Some(location) =>
+        assert(actualTableType === CatalogTableType.EXTERNAL)
+      case None =>
+        assert(actualTableType === CatalogTableType.MANAGED)
+    }
+  }
 
-    setConf(HiveUtils.CONVERT_CTAS, true)
+  test("CTAS without serde without location") {
+    val originalConf = sessionState.conf.convertCTAS
 
+    setConf(SQLConf.CONVERT_CTAS, true)
+
+    val defaultDataSource = sessionState.conf.defaultDataSourceName
     try {
       sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, 
value")
       sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src 
ORDER BY k, value")
-      var message = intercept[AnalysisException] {
+      val message = intercept[AnalysisException] {
         sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, 
value")
       }.getMessage
       assert(message.contains("already exists"))
-      checkRelation("ctas1", true)
+      checkRelation("ctas1", true, defaultDataSource)
       sql("DROP TABLE ctas1")
 
       // Specifying database name for query can be converted to data source 
write path
       // is not allowed right now.
-      message = intercept[AnalysisException] {
-        sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER 
BY k, value")
-      }.getMessage
-      assert(
-        message.contains("Cannot specify database name in a CTAS statement"),
-        "When spark.sql.hive.convertCTAS is true, we should not allow " +
-            "database name specified.")
+      sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY 
k, value")
+      checkRelation("ctas1", true, defaultDataSource)
+      sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as textfile" +
           " AS SELECT key k, value FROM src ORDER BY k, value")
-      checkRelation("ctas1", true)
+      checkRelation("ctas1", false, "text")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as sequencefile" +
             " AS SELECT key k, value FROM src ORDER BY k, value")
-      checkRelation("ctas1", true)
+      checkRelation("ctas1", false, "sequence")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src 
ORDER BY k, value")
-      checkRelation("ctas1", false)
+      checkRelation("ctas1", false, "rcfile")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src 
ORDER BY k, value")
-      checkRelation("ctas1", false)
+      checkRelation("ctas1", false, "orc")
       sql("DROP TABLE ctas1")
 
       sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM 
src ORDER BY k, value")
-      checkRelation("ctas1", false)
+      checkRelation("ctas1", false, "parquet")
       sql("DROP TABLE ctas1")
     } finally {
-      setConf(HiveUtils.CONVERT_CTAS, originalConf)
+      setConf(SQLConf.CONVERT_CTAS, originalConf)
       sql("DROP TABLE IF EXISTS ctas1")
     }
   }
 
+  test("CTAS without serde with location") {
+    withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
+      withTempDir { dir =>
+        val defaultDataSource = sessionState.conf.defaultDataSourceName
+
+        val tempLocation = dir.getCanonicalPath
+        sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", true, defaultDataSource, 
Some(s"file:$tempLocation/c1"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c2'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", true, defaultDataSource, 
Some(s"file:$tempLocation/c2"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 stored as textfile LOCATION 
'file:$tempLocation/c3'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", false, "text", Some(s"file:$tempLocation/c3"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 stored as sequenceFile LOCATION 
'file:$tempLocation/c4'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", false, "sequence", 
Some(s"file:$tempLocation/c4"))
+        sql("DROP TABLE ctas1")
+
+        sql(s"CREATE TABLE ctas1 stored as rcfile LOCATION 
'file:$tempLocation/c5'" +
+          " AS SELECT key k, value FROM src ORDER BY k, value")
+        checkRelation("ctas1", false, "rcfile", Some(s"file:$tempLocation/c5"))
+        sql("DROP TABLE ctas1")
+      }
+    }
+  }
+
   test("CTAS with serde") {
     sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, 
value").collect()
     sql(
@@ -785,8 +848,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 
1}]}"""))
     read.json(rdd).createOrReplaceTempView("data")
-    val originalConf = sessionState.convertCTAS
-    setConf(HiveUtils.CONVERT_CTAS, false)
+    val originalConf = sessionState.conf.convertCTAS
+    setConf(SQLConf.CONVERT_CTAS, false)
 
     try {
       sql("CREATE TABLE explodeTest (key bigInt)")
@@ -805,7 +868,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       sql("DROP TABLE explodeTest")
       dropTempTable("data")
     } finally {
-      setConf(HiveUtils.CONVERT_CTAS, originalConf)
+      setConf(SQLConf.CONVERT_CTAS, originalConf)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to