Repository: spark
Updated Branches:
  refs/heads/branch-2.0 110876b9a -> adc1c2685


[SPARK-14346][SQL][FOLLOW-UP] add tests for CREAT TABLE USING with partition 
and bucket

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/12781 introduced PARTITIONED BY, CLUSTERED 
BY, and SORTED BY keywords to CREATE TABLE USING. This PR adds tests to make 
sure those keywords are handled correctly.

This PR also fixes a mistake that we should create non-hive-compatible table if 
partition or bucket info exists.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #13144 from cloud-fan/add-test.

(cherry picked from commit 20a89478e168cb6901ef89f4cb6aa79193ed244a)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adc1c268
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adc1c268
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adc1c268

Branch: refs/heads/branch-2.0
Commit: adc1c2685ea0cfbf23716a4199b85c65021d15c6
Parents: 110876b
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue May 17 10:12:51 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue May 17 10:14:00 2016 -0700

----------------------------------------------------------------------
 .../command/createDataSourceTables.scala        | 11 +++-
 .../sql/execution/command/DDLCommandSuite.scala | 53 ++++++++++++++++++++
 .../spark/sql/execution/command/DDLSuite.scala  | 44 ++++++++++++++++
 3 files changed, 106 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/adc1c268/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 7d3c5257..70e5108 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -399,8 +399,8 @@ object CreateDataSourceTableUtils extends Logging {
             "Hive metastore in Spark SQL specific format, which is NOT 
compatible with Hive."
         (None, message)
 
-      case (Some(serde), relation: HadoopFsRelation)
-        if relation.location.paths.length == 1 && 
relation.partitionSchema.isEmpty =>
+      case (Some(serde), relation: HadoopFsRelation) if 
relation.location.paths.length == 1 &&
+        relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
         val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
         val message =
           s"Persisting data source relation $qualifiedTableName with a single 
input path " +
@@ -415,6 +415,13 @@ object CreateDataSourceTableUtils extends Logging {
             "Input path(s): " + relation.location.paths.mkString("\n", "\n", 
"")
         (None, message)
 
+      case (Some(serde), relation: HadoopFsRelation) if 
relation.bucketSpec.nonEmpty =>
+        val message =
+          s"Persisting bucketed data source relation $qualifiedTableName into 
" +
+            "Hive metastore in Spark SQL specific format, which is NOT 
compatible with Hive. " +
+            "Input path(s): " + relation.location.paths.mkString("\n", "\n", 
"")
+        (None, message)
+
       case (Some(serde), relation: HadoopFsRelation) =>
         val message =
           s"Persisting data source relation $qualifiedTableName with multiple 
input paths into " +

http://git-wip-us.apache.org/repos/asf/spark/blob/adc1c268/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 13df449..897170e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.SparkSqlParser
+import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsing}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
 // TODO: merge this with DDLSuite (SPARK-14441)
 class DDLCommandSuite extends PlanTest {
@@ -238,6 +240,57 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
+  test("create table using - with partitioned by") {
+    val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet 
PARTITIONED BY (a)"
+    val expected = CreateTableUsing(
+      TableIdentifier("my_tab"),
+      Some(new StructType().add("a", IntegerType).add("b", StringType)),
+      "parquet",
+      false,
+      Map.empty,
+      null,
+      None,
+      false,
+      true)
+
+    parser.parsePlan(query) match {
+      case ct: CreateTableUsing =>
+        // We can't compare array in `CreateTableUsing` directly, so here we 
compare
+        // `partitionColumns` ahead, and make `partitionColumns` null before 
plan comparison.
+        assert(Seq("a") == ct.partitionColumns.toSeq)
+        comparePlans(ct.copy(partitionColumns = null), expected)
+      case other =>
+        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from 
query," +
+          s"got ${other.getClass.getName}: $query")
+    }
+  }
+
+  test("create table using - with bucket") {
+    val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
+      "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
+    val expected = CreateTableUsing(
+      TableIdentifier("my_tab"),
+      Some(new StructType().add("a", IntegerType).add("b", StringType)),
+      "parquet",
+      false,
+      Map.empty,
+      null,
+      Some(BucketSpec(5, Seq("a"), Seq("b"))),
+      false,
+      true)
+
+    parser.parsePlan(query) match {
+      case ct: CreateTableUsing =>
+        // `Array.empty == Array.empty` returns false, here we set 
`partitionColumns` to null before
+        // plan comparison.
+        assert(ct.partitionColumns.isEmpty)
+        comparePlans(ct.copy(partitionColumns = null), expected)
+      case other =>
+        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from 
query," +
+          s"got ${other.getClass.getName}: $query")
+    }
+  }
+
   // ALTER TABLE table_name RENAME TO new_table_name;
   // ALTER VIEW view_name RENAME TO new_view_name;
   test("alter table/view: rename table/view") {

http://git-wip-us.apache.org/repos/asf/spark/blob/adc1c268/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 82123be..d72dc09 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -29,8 +29,10 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach 
{
   private val escapedIdentifier = "`(.+)`".r
@@ -350,6 +352,48 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
   }
 
+  test("create table using") {
+    val catalog = spark.sessionState.catalog
+    withTable("tbl") {
+      sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
+      val table = catalog.getTableMetadata(TableIdentifier("tbl"))
+      assert(table.tableType == CatalogTableType.MANAGED)
+      assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", 
"int")))
+      assert(table.properties("spark.sql.sources.provider") == "parquet")
+    }
+  }
+
+  test("create table using - with partitioned by") {
+    val catalog = spark.sessionState.catalog
+    withTable("tbl") {
+      sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)")
+      val table = catalog.getTableMetadata(TableIdentifier("tbl"))
+      assert(table.tableType == CatalogTableType.MANAGED)
+      assert(table.schema.isEmpty) // partitioned datasource table is not 
hive-compatible
+      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(DDLUtils.getSchemaFromTableProperties(table) ==
+        Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+      assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
+        Seq("a"))
+    }
+  }
+
+  test("create table using - with bucket") {
+    val catalog = spark.sessionState.catalog
+    withTable("tbl") {
+      sql("CREATE TABLE tbl(a INT, b INT) USING parquet " +
+        "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS")
+      val table = catalog.getTableMetadata(TableIdentifier("tbl"))
+      assert(table.tableType == CatalogTableType.MANAGED)
+      assert(table.schema.isEmpty) // partitioned datasource table is not 
hive-compatible
+      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(DDLUtils.getSchemaFromTableProperties(table) ==
+        Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
+      assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
+        Some(BucketSpec(5, Seq("a"), Seq("b"))))
+    }
+  }
+
   test("alter table: rename") {
     val catalog = spark.sessionState.catalog
     val tableIdent1 = TableIdentifier("tab1", Some("dbx"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to