Repository: spark
Updated Branches:
  refs/heads/master d5f12bfe8 -> e50934f11


[SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.

JIRA: https://issues.apache.org/jira/browse/SPARK-5723

Author: Yin Huai <yh...@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <mich...@databricks.com>

Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following 
commits:

a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
defaultCTASFileFormat
ad2b07d [Yin Huai] Update tests and error messages.
8af5b2a [Yin Huai] Update conf key and unit test.
5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when 
no storage format/handler is specified.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e50934f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e50934f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e50934f1

Branch: refs/heads/master
Commit: e50934f11e1e3ded21a631e5ab69db3c79467137
Parents: d5f12bf
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Feb 17 18:14:33 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Feb 17 18:14:33 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveContext.scala | 15 ++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 75 ++++++++++++++++----
 .../spark/sql/hive/execution/commands.scala     | 17 +++--
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 70 ++++++++++++++++++
 5 files changed, 158 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 6c55bc6..d3365b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   protected[sql] def convertMetastoreParquet: Boolean =
     getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
 
+  /**
+   * When true, a table created by a Hive CTAS statement (no USING clause) 
will be
+   * converted to a data source table, using the data source set by 
spark.sql.sources.default.
+   * The table in CTAS statement will be converted when it meets any of the 
following conditions:
+   *   - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File 
Format (STORED AS), or
+   *     a Storage Hanlder (STORED BY), and the value of 
hive.default.fileformat in hive-site.xml
+   *     is either TextFile or SequenceFile.
+   *   - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the 
file format and no SerDe
+   *     is specified (no ROW FORMAT SERDE clause).
+   *   - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as 
the file format
+   *     and no SerDe is specified (no ROW FORMAT SERDE clause).
+   */
+  protected[sql] def convertCTAS: Boolean =
+    getConf("spark.sql.hive.convertCTAS", "false").toBoolean
+
   override protected[sql] def executePlan(plan: LogicalPlan): 
this.QueryExecution =
     new this.QueryExecution(plan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index cfd6f27..f7ad2ef 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, 
SerDeException}
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.{AnalysisException, SQLContext}
+import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, 
OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => 
ParquetPartition, PartitionSpec}
-import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, 
ResolvedDataSource}
+import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, 
LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -502,24 +502,69 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
           Some(sa.getQB().getTableDesc)
         }
 
-        execution.CreateTableAsSelect(
-          databaseName,
-          tableName,
-          child,
-          allowExisting,
-          desc)
+        // Check if the query specifies file format or storage handler.
+        val hasStorageSpec = desc match {
+          case Some(crtTbl) =>
+            crtTbl != null && (crtTbl.getSerName != null || 
crtTbl.getStorageHandler != null)
+          case None => false
+        }
+
+        if (hive.convertCTAS && !hasStorageSpec) {
+          // Do the conversion when spark.sql.hive.convertCTAS is true and the 
query
+          // does not specify any storage format (file format and storage 
handler).
+          if (dbName.isDefined) {
+            throw new AnalysisException(
+              "Cannot specify database name in a CTAS statement " +
+              "when spark.sql.hive.convertCTAS is set to true.")
+          }
+
+          val mode = if (allowExisting) SaveMode.Ignore else 
SaveMode.ErrorIfExists
+          CreateTableUsingAsSelect(
+            tblName,
+            hive.conf.defaultDataSourceName,
+            temporary = false,
+            mode,
+            options = Map.empty[String, String],
+            child
+          )
+        } else {
+          execution.CreateTableAsSelect(
+            databaseName,
+            tableName,
+            child,
+            allowExisting,
+            desc)
+        }
 
       case p: LogicalPlan if p.resolved => p
 
       case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) 
=>
         val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
-        val databaseName = 
dbName.getOrElse(hive.sessionState.getCurrentDatabase)
-        execution.CreateTableAsSelect(
-          databaseName,
-          tableName,
-          child,
-          allowExisting,
-          None)
+        if (hive.convertCTAS) {
+          if (dbName.isDefined) {
+            throw new AnalysisException(
+              "Cannot specify database name in a CTAS statement " +
+              "when spark.sql.hive.convertCTAS is set to true.")
+          }
+
+          val mode = if (allowExisting) SaveMode.Ignore else 
SaveMode.ErrorIfExists
+          CreateTableUsingAsSelect(
+            tblName,
+            hive.conf.defaultDataSourceName,
+            temporary = false,
+            mode,
+            options = Map.empty[String, String],
+            child
+          )
+        } else {
+          val databaseName = 
dbName.getOrElse(hive.sessionState.getCurrentDatabase)
+          execution.CreateTableAsSelect(
+            databaseName,
+            tableName,
+            child,
+            allowExisting,
+            None)
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6afd8ee..c88d0e6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.sources._
@@ -121,7 +122,7 @@ case class CreateMetastoreDataSource(
       if (allowExisting) {
         return Seq.empty[Row]
       } else {
-        sys.error(s"Table $tableName already exists.")
+        throw new AnalysisException(s"Table $tableName already exists.")
       }
     }
 
@@ -172,9 +173,11 @@ case class CreateMetastoreDataSourceAsSelect(
       // Check if we need to throw an exception or just return.
       mode match {
         case SaveMode.ErrorIfExists =>
-          sys.error(s"Table $tableName already exists. " +
-            s"If you want to append into it, please set mode to 
SaveMode.Append. " +
-            s"Or, if you want to overwrite it, please set mode to 
SaveMode.Overwrite.")
+          throw new AnalysisException(s"Table $tableName already exists. " +
+            s"If you are using saveAsTable, you can set SaveMode to 
SaveMode.Append to " +
+            s"insert data into the table or set SaveMode to SaveMode.Overwrite 
to overwrite" +
+            s"the existing data. " +
+            s"Or, if you are using SQL CREATE TABLE, you need to drop 
$tableName first.")
         case SaveMode.Ignore =>
           // Since the table already exists and the save mode is Ignore, we 
will just return.
           return Seq.empty[Row]
@@ -199,7 +202,7 @@ case class CreateMetastoreDataSourceAsSelect(
                 s"== Actual Schema ==" +:
                   
createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
               """.stripMargin
-                sys.error(errorMessage)
+                throw new AnalysisException(errorMessage)
               } else if (i != createdRelation.relation) {
                 val errorDescription =
                   s"Cannot append to table $tableName because the resolved 
relation does not " +
@@ -216,10 +219,10 @@ case class CreateMetastoreDataSourceAsSelect(
                 s"== Actual Relation ==" ::
                   createdRelation.toString :: Nil).mkString("\n")}
               """.stripMargin
-                sys.error(errorMessage)
+                throw new AnalysisException(errorMessage)
               }
             case o =>
-              sys.error(s"Saving data in ${o.toString} is not supported.")
+              throw new AnalysisException(s"Saving data in ${o.toString} is 
not supported.")
           }
         case SaveMode.Overwrite =>
           hiveContext.sql(s"DROP TABLE IF EXISTS $tableName")

http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c30090f..e5156ae 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -306,8 +306,8 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
         |SELECT * FROM jsonTable
       """.stripMargin)
 
-    // Create the table again should trigger a AlreadyExistsException.
-    val message = intercept[RuntimeException] {
+    // Create the table again should trigger a AnalysisException.
+    val message = intercept[AnalysisException] {
       sql(
         s"""
         |CREATE TABLE ctasJsonTable
@@ -516,7 +516,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
       sql("SELECT * FROM createdJsonTable"),
       df.collect())
 
-    var message = intercept[RuntimeException] {
+    var message = intercept[AnalysisException] {
       createExternalTable("createdJsonTable", filePath.toString)
     }.getMessage
     assert(message.contains("Table createdJsonTable already exists."),

http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ae03bc5..f2bc73b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
 
@@ -42,6 +45,73 @@ class SQLQuerySuite extends QueryTest {
     )
   }
 
+  test("CTAS without serde") {
+    def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = 
{
+      val relation = 
EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
+      relation match {
+        case LogicalRelation(r: ParquetRelation2) =>
+          if (!isDataSourceParquet) {
+            fail(
+              s"${classOf[MetastoreRelation].getCanonicalName} is expected, 
but found " +
+              s"${ParquetRelation2.getClass.getCanonicalName}.")
+          }
+
+        case r: MetastoreRelation =>
+          if (isDataSourceParquet) {
+            fail(
+              s"${ParquetRelation2.getClass.getCanonicalName} is expected, but 
found " +
+              s"${classOf[MetastoreRelation].getCanonicalName}.")
+          }
+      }
+    }
+
+    val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
+
+    setConf("spark.sql.hive.convertCTAS", "true")
+
+    sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+    sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src 
ORDER BY k, value")
+    var message = intercept[AnalysisException] {
+      sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, 
value")
+    }.getMessage
+    assert(message.contains("Table ctas1 already exists"))
+    checkRelation("ctas1", true)
+    sql("DROP TABLE ctas1")
+
+    // Specifying database name for query can be converted to data source 
write path
+    // is not allowed right now.
+    message = intercept[AnalysisException] {
+      sql("CREATE TABLE default.ctas1 AS SELECT key k, value FROM src ORDER BY 
k, value")
+    }.getMessage
+    assert(
+      message.contains("Cannot specify database name in a CTAS statement"),
+      "When spark.sql.hive.convertCTAS is true, we should not allow " +
+      "database name specified.")
+
+    sql("CREATE TABLE ctas1 stored as textfile AS SELECT key k, value FROM src 
ORDER BY k, value")
+    checkRelation("ctas1", true)
+    sql("DROP TABLE ctas1")
+
+    sql(
+      "CREATE TABLE ctas1 stored as sequencefile AS SELECT key k, value FROM 
src ORDER BY k, value")
+    checkRelation("ctas1", true)
+    sql("DROP TABLE ctas1")
+
+    sql("CREATE TABLE ctas1 stored as rcfile AS SELECT key k, value FROM src 
ORDER BY k, value")
+    checkRelation("ctas1", false)
+    sql("DROP TABLE ctas1")
+
+    sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src 
ORDER BY k, value")
+    checkRelation("ctas1", false)
+    sql("DROP TABLE ctas1")
+
+    sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src 
ORDER BY k, value")
+    checkRelation("ctas1", false)
+    sql("DROP TABLE ctas1")
+
+    setConf("spark.sql.hive.convertCTAS", originalConf)
+  }
+
   test("CTAS with serde") {
     sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, 
value").collect()
     sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to