[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14482


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73643735
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
+val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == 
SaveMode.Ignore)
+ExecutedCommandExec(cmd) :: Nil
+
+  case CreateTable(tableDesc, mode, None) =>
 val cmd =
   CreateDataSourceTableCommand(
-c.tableIdent,
-c.userSpecifiedSchema,
-c.provider,
-c.options,
-c.partitionColumns,
-c.bucketSpec,
-c.allowExisting,
-c.managedIfNoPath)
+tableDesc.identifier,
+if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else 
None,
+tableDesc.provider.get,
+tableDesc.storage.properties,
+tableDesc.partitionColumnNames.toArray,
+tableDesc.bucketSpec,
+ignoreIfExists = mode == SaveMode.Ignore,
+managedIfNoPath = tableDesc.tableType == 
CatalogTableType.MANAGED)
--- End diff --

comment added: 
https://github.com/apache/spark/pull/14482/files#diff-1bb4f7bd5a2656f48bcd3c857167a11bR331


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73638320
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -272,18 +259,15 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   options: Map[String, String]): DataFrame = {
 val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-val cmd =
-  CreateTableUsing(
-tableIdent,
-userSpecifiedSchema = Some(schema),
-source,
-temporary = false,
-options,
-partitionColumns = Array.empty[String],
-bucketSpec = None,
-allowExisting = false,
-managedIfNoPath = false)
-sparkSession.sessionState.executePlan(cmd).toRdd
+val tableDesc = CatalogTable(
+  identifier = tableIdent,
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(properties = options),
+  schema = schema,
+  provider = Some(source)
+)
+val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
--- End diff --

good catch! yea we should diable hive for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73574347
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -272,18 +259,15 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   options: Map[String, String]): DataFrame = {
 val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-val cmd =
-  CreateTableUsing(
-tableIdent,
-userSpecifiedSchema = Some(schema),
-source,
-temporary = false,
-options,
-partitionColumns = Array.empty[String],
-bucketSpec = None,
-allowExisting = false,
-managedIfNoPath = false)
-sparkSession.sessionState.executePlan(cmd).toRdd
+val tableDesc = CatalogTable(
+  identifier = tableIdent,
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(properties = options),
+  schema = schema,
+  provider = Some(source)
+)
+val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
--- End diff --

If so, we need to populate the `location` of `storage`. Maybe we just 
disable the option `hive` and issue an exception when users do it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73568017
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
+val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == 
SaveMode.Ignore)
+ExecutedCommandExec(cmd) :: Nil
+
+  case CreateTable(tableDesc, mode, None) =>
 val cmd =
   CreateDataSourceTableCommand(
-c.tableIdent,
-c.userSpecifiedSchema,
-c.provider,
-c.options,
-c.partitionColumns,
-c.bucketSpec,
-c.allowExisting,
-c.managedIfNoPath)
+tableDesc.identifier,
+if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else 
None,
+tableDesc.provider.get,
+tableDesc.storage.properties,
+tableDesc.partitionColumnNames.toArray,
+tableDesc.bucketSpec,
+ignoreIfExists = mode == SaveMode.Ignore,
+managedIfNoPath = tableDesc.tableType == 
CatalogTableType.MANAGED)
--- End diff --

A little bit tricky here, but the change is not a bug. Maybe leave a TODO 
here? When we reimplementing the write path, we can simplify the logics to 
determine the table type. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73557584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -272,18 +259,15 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   options: Map[String, String]): DataFrame = {
 val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-val cmd =
-  CreateTableUsing(
-tableIdent,
-userSpecifiedSchema = Some(schema),
-source,
-temporary = false,
-options,
-partitionColumns = Array.empty[String],
-bucketSpec = None,
-allowExisting = false,
-managedIfNoPath = false)
-sparkSession.sessionState.executePlan(cmd).toRdd
+val tableDesc = CatalogTable(
+  identifier = tableIdent,
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(properties = options),
+  schema = schema,
+  provider = Some(source)
+)
+val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
--- End diff --

This change will have an external impact. The value of source is input from 
users. That means it could be "hive". That means, users can create a Hive serde 
Table. If this is desired, we need to document it and test it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73553259
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -223,20 +223,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   tableName: String,
   source: String,
   options: Map[String, String]): DataFrame = {
-val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-val cmd =
-  CreateTableUsing(
-tableIdent,
-userSpecifiedSchema = None,
-source,
-temporary = false,
-options = options,
-partitionColumns = Array.empty[String],
-bucketSpec = None,
-allowExisting = false,
-managedIfNoPath = false)
-sparkSession.sessionState.executePlan(cmd).toRdd
-sparkSession.table(tableIdent)
+createExternalTable(tableName, source, new StructType, options)
--- End diff --

This change will have an external impact. The value of `source` is input 
from users. That means it could be "hive". That means, users can create a Hive 
serde Table. If this is desired, we need to document it and test it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73478952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 }
 val options = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val provider = ctx.tableProvider.qualifiedName.getText
+val schema = Option(ctx.colTypeList()).map(createStructType)
 val partitionColumnNames =
   Option(ctx.partitionColumnNames)
 .map(visitIdentifierList(_).toArray)
 .getOrElse(Array.empty[String])
 val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
+val tableDesc = CatalogTable(
+  identifier = table,
+  tableType = CatalogTableType.MANAGED,
+  storage = CatalogStorageFormat.empty.copy(properties = options),
+  schema = schema.getOrElse(new StructType),
+  provider = Some(provider),
--- End diff --

It was a more general question which you have answered, sorry about that.

In this case the provider cannot be null: the grammar requires a provider 
and the call `ctx.tableProvider.qualifiedName.getText` would result in a NPE if 
it were null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73478351
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73478108
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 ---
@@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest {
 
   test("create table - location implies external") {
 val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
-val ct = parseAs[CreateTableCommand](query)
-assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-assert(ct.table.storage.locationUri == Some("/something/anything"))
-  }
-
-  test("create table - column repeated in partitioning columns") {
-val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY 
(key INT, hr STRING)"
-val e = intercept[ParseException] { parser.parsePlan(query) }
-assert(e.getMessage.contains(
-  "Operation not allowed: Partition columns may not be specified in 
the schema: [\"key\"]"))
-  }
-
-  test("create table - duplicate column names in the table definition") {
--- End diff --

here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73478093
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 ---
@@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest {
 
   test("create table - location implies external") {
 val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
-val ct = parseAs[CreateTableCommand](query)
-assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-assert(ct.table.storage.locationUri == Some("/something/anything"))
-  }
-
-  test("create table - column repeated in partitioning columns") {
--- End diff --

moved it to `DDLSuite` as it's not a parser error anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73477850
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 }
 val options = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val provider = ctx.tableProvider.qualifiedName.getText
+val schema = Option(ctx.colTypeList()).map(createStructType)
 val partitionColumnNames =
   Option(ctx.partitionColumnNames)
 .map(visitIdentifierList(_).toArray)
 .getOrElse(Array.empty[String])
 val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
+val tableDesc = CatalogTable(
+  identifier = table,
+  tableType = CatalogTableType.MANAGED,
+  storage = CatalogStorageFormat.empty.copy(properties = options),
+  schema = schema.getOrElse(new StructType),
+  provider = Some(provider),
--- End diff --

well, just look at the codes around, it's possible that `provider` is null. 
However, if we also look at the antlr file, the `provider` must be specified. 
The previous code doesn't check the null either, and will throw NPE somewhere 
if it's null. So I think we should use `Some` here to follow this behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73477314
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 }
 val options = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val provider = ctx.tableProvider.qualifiedName.getText
+val schema = Option(ctx.colTypeList()).map(createStructType)
 val partitionColumnNames =
   Option(ctx.partitionColumnNames)
 .map(visitIdentifierList(_).toArray)
 .getOrElse(Array.empty[String])
 val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
+val tableDesc = CatalogTable(
+  identifier = table,
+  tableType = CatalogTableType.MANAGED,
+  storage = CatalogStorageFormat.empty.copy(properties = options),
+  schema = schema.getOrElse(new StructType),
+  provider = Some(provider),
--- End diff --

yea, for example, 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala#L194


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73476814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
--- End diff --

yea, `provider` can be None in `CreateView`, that's why I add this check in 
`CreateTable` instead of `CatalogTable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73475935
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -154,6 +274,21 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
 
   def apply(plan: LogicalPlan): Unit = {
 plan.foreach {
+  case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
+// Since we are saving table metadata to metastore, we should make 
sure the table name
+// and database name don't break some common restrictions, e.g. 
special chars except
+// underscore are not allowed.
+val pattern = Pattern.compile("[\\w_]+")
--- End diff --

Compiling a pattern can be expensive and the `Pattern` produced is 
reusable; that is all. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73474964
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
--- End diff --

Ok, can we just change `CatalogTable.provider` into a `String` then, 
instead of an `Option[String]`? Or is this public API which we cannot change? 
If so, it would be nice to add a getter method to the `CatalogTable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73474573
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 }
 val options = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val provider = ctx.tableProvider.qualifiedName.getText
+val schema = Option(ctx.colTypeList()).map(createStructType)
 val partitionColumnNames =
   Option(ctx.partitionColumnNames)
 .map(visitIdentifierList(_).toArray)
 .getOrElse(Array.empty[String])
 val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
+val tableDesc = CatalogTable(
+  identifier = table,
+  tableType = CatalogTableType.MANAGED,
+  storage = CatalogStorageFormat.empty.copy(properties = options),
+  schema = schema.getOrElse(new StructType),
+  provider = Some(provider),
--- End diff --

Is it still possible that a provide is `None`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73474139
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 throw new AnalysisException(s"Table $tableIdent already exists.")
 
   case _ =>
-val cmd =
-  CreateTableUsingAsSelect(
-tableIdent,
-source,
-
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
-getBucketSpec,
-mode,
-extraOptions.toMap,
-df.logicalPlan)
+val tableDesc = CatalogTable(
+  identifier = tableIdent,
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(properties = 
extraOptions.toMap),
+  schema = new StructType,
+  provider = Some(source),
+  partitionColumnNames = partitioningColumns.getOrElse(Nil),
+  bucketSpec = getBucketSpec
+)
+val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
--- End diff --

I know I am nit picking here :).

I have seen a few cases in which some thought the parameter was non-null 
and used `Some(...)` to wrap that; resulting in a very nice NPE down the line 
(which you don't expect in an `Option`). In this case you are totally right to 
use `Some(...)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73472950
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

BTW, test added: 
https://github.com/apache/spark/pull/14482/files#diff-b7094baa12601424a5d19cb930e3402fR144


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73472871
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

`CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT)`

for this query, the table definition is `(key int, value string, key int)`, 
and `PreprocessDDL` will catch and throw exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73471840
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
+case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
+  val schema = if (query.isDefined) query.get.schema else 
tableDesc.schema
+  checkDuplication(schema.map(_.name), "table definition of " + 
tableDesc.identifier)
+
+  val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
+  val bucketColsChecked = checkBucketColumns(schema, 
partitionColsChecked)
+  c.copy(tableDesc = bucketColsChecked)
+  }
+
+  private def checkPartitionColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+val normalizedPartitionCols = tableDesc.partitionColumnNames.map { 
colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"partition")
+}
+checkDuplication(normalizedPartitionCols, "partition")
+
+if (schema.nonEmpty && normalizedPartitionCols.length == 
schema.length) {
+  failAnalysis("Cannot use all columns for partition columns")
+}
+
+schema.filter(f => 
normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
+  case _: AtomicType => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} for 
partition column")
+}
+
+tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+  }
+
+  private def checkBucketColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+tableDesc.bucketSpec match {
+  case Some(BucketSpec(numBuckets, bucketColumnNames, 
sortColumnNames)) =>
+val normalizedBucketCols = bucketColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"bucket")
+}
+checkDuplication(normalizedBucketCols, "bucket")
+
+val normalizedSortCols = sortColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"sort")
+}
+checkDuplication(normalizedSortCols, "sort")
+
+schema.filter(f => 
normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
+  case dt if RowOrdering.isOrderable(dt) => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} 
for 

[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73471330
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

wait. I found a conflict here. 

For Hive Tables, the following query is not allowed. 
```
CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT)
```
However, for data source tables, this query is allowed with a minor change. 
```
CREATE TABLE tab1 (key INT, value STRING) USING json PARTITIONED BY (key)
```

Thus, they are different regarding whether partitioning columns should or 
should not be included in data columns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73471131
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
+case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
+  val schema = if (query.isDefined) query.get.schema else 
tableDesc.schema
+  checkDuplication(schema.map(_.name), "table definition of " + 
tableDesc.identifier)
+
+  val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
+  val bucketColsChecked = checkBucketColumns(schema, 
partitionColsChecked)
+  c.copy(tableDesc = bucketColsChecked)
+  }
+
+  private def checkPartitionColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+val normalizedPartitionCols = tableDesc.partitionColumnNames.map { 
colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"partition")
+}
+checkDuplication(normalizedPartitionCols, "partition")
+
+if (schema.nonEmpty && normalizedPartitionCols.length == 
schema.length) {
+  failAnalysis("Cannot use all columns for partition columns")
+}
+
+schema.filter(f => 
normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
+  case _: AtomicType => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} for 
partition column")
+}
+
+tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+  }
+
+  private def checkBucketColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+tableDesc.bucketSpec match {
+  case Some(BucketSpec(numBuckets, bucketColumnNames, 
sortColumnNames)) =>
+val normalizedBucketCols = bucketColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"bucket")
+}
+checkDuplication(normalizedBucketCols, "bucket")
+
+val normalizedSortCols = sortColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"sort")
+}
+checkDuplication(normalizedSortCols, "sort")
+
+schema.filter(f => 
normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
+  case dt if RowOrdering.isOrderable(dt) => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} 
for 

[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73470304
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

uh, I see. I am fine about the new error message. Maybe keep the existing 
test cases and update the error messages. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73469783
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
+case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
+  val schema = if (query.isDefined) query.get.schema else 
tableDesc.schema
+  checkDuplication(schema.map(_.name), "table definition of " + 
tableDesc.identifier)
+
+  val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
+  val bucketColsChecked = checkBucketColumns(schema, 
partitionColsChecked)
+  c.copy(tableDesc = bucketColsChecked)
+  }
+
+  private def checkPartitionColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+val normalizedPartitionCols = tableDesc.partitionColumnNames.map { 
colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"partition")
+}
+checkDuplication(normalizedPartitionCols, "partition")
+
+if (schema.nonEmpty && normalizedPartitionCols.length == 
schema.length) {
+  failAnalysis("Cannot use all columns for partition columns")
+}
+
+schema.filter(f => 
normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
+  case _: AtomicType => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} for 
partition column")
+}
+
+tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+  }
+
+  private def checkBucketColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+tableDesc.bucketSpec match {
+  case Some(BucketSpec(numBuckets, bucketColumnNames, 
sortColumnNames)) =>
+val normalizedBucketCols = bucketColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"bucket")
+}
+checkDuplication(normalizedBucketCols, "bucket")
+
+val normalizedSortCols = sortColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"sort")
+}
+checkDuplication(normalizedSortCols, "sort")
+
+schema.filter(f => 
normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
+  case dt if RowOrdering.isOrderable(dt) => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} 
for 

[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73469639
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

but if you think it in another way, for CREATE TABLE using hive syntax, the 
table schema is a union of data definition and partition columns. Then this 
exception is actually `duplicated columns in table schema`, which can be caught 
by `PreprocessDDL`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73469309
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
--- End diff --

because hive CREATE TABLE syntax is special, it will never hit this 
exception `partition column names must exist in table definition.` Instead, 
hive may specify partition column names that exist in table definition, and 
will hit `column names in table definition can't be duplicated`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73468039
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

`PreprocessDDL` is unable to catch it. `schema` already combines `dataCols` 
and `partitionCols`. This checking is for detecting the duplicate columns 
between `dataCols` and `partitionCols`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73467801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

Because the checks in `PreprocessDDL` already covers them, and respect case 
sensitivity. Although the error message is different, but I think `duplicated 
columns found in table definition` also makes sense for hive serde tables that 
define partition column names that already in table schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73467821
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
--- End diff --

Yeah, these checking are general to both types. The only issue is here: 
```
partition, bucket and sort column names must exist in table definition.
```

However, the schema value from the parser combines both. Thus, Hive table 
creation pass the checks
```
val schema = StructType(dataCols ++ partitionCols)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73467467
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
+case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
+  val schema = if (query.isDefined) query.get.schema else 
tableDesc.schema
+  checkDuplication(schema.map(_.name), "table definition of " + 
tableDesc.identifier)
+
+  val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
+  val bucketColsChecked = checkBucketColumns(schema, 
partitionColsChecked)
+  c.copy(tableDesc = bucketColsChecked)
+  }
+
+  private def checkPartitionColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+val normalizedPartitionCols = tableDesc.partitionColumnNames.map { 
colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"partition")
+}
+checkDuplication(normalizedPartitionCols, "partition")
+
+if (schema.nonEmpty && normalizedPartitionCols.length == 
schema.length) {
+  failAnalysis("Cannot use all columns for partition columns")
+}
+
+schema.filter(f => 
normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
+  case _: AtomicType => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} for 
partition column")
+}
+
+tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+  }
+
+  private def checkBucketColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+tableDesc.bucketSpec match {
+  case Some(BucketSpec(numBuckets, bucketColumnNames, 
sortColumnNames)) =>
+val normalizedBucketCols = bucketColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"bucket")
+}
+checkDuplication(normalizedBucketCols, "bucket")
+
+val normalizedSortCols = sortColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"sort")
+}
+checkDuplication(normalizedSortCols, "sort")
+
+schema.filter(f => 
normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
+  case dt if RowOrdering.isOrderable(dt) => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} 
for 

[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73467397
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -206,22 +206,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
 // The relation in l is not an InsertableRelation.
 failAnalysis(s"$l does not allow insertion.")
 
-  case c: CreateTableUsingAsSelect =>
+  case CreateTable(tableDesc, mode, Some(query)) =>
--- End diff --

The exception is CREATE HIVE TABLE AS SELECT. So far, we assume all the 
unsupported cases have been captured in the DDL handling. Thus, I think it is 
safe to apply all the cases to Hive Tables. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73466961
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
+case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
+  val schema = if (query.isDefined) query.get.schema else 
tableDesc.schema
+  checkDuplication(schema.map(_.name), "table definition of " + 
tableDesc.identifier)
+
+  val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
+  val bucketColsChecked = checkBucketColumns(schema, 
partitionColsChecked)
+  c.copy(tableDesc = bucketColsChecked)
+  }
+
+  private def checkPartitionColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+val normalizedPartitionCols = tableDesc.partitionColumnNames.map { 
colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"partition")
+}
+checkDuplication(normalizedPartitionCols, "partition")
+
+if (schema.nonEmpty && normalizedPartitionCols.length == 
schema.length) {
+  failAnalysis("Cannot use all columns for partition columns")
+}
+
+schema.filter(f => 
normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
+  case _: AtomicType => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} for 
partition column")
+}
+
+tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+  }
+
+  private def checkBucketColumns(schema: StructType, tableDesc: 
CatalogTable): CatalogTable = {
+tableDesc.bucketSpec match {
+  case Some(BucketSpec(numBuckets, bucketColumnNames, 
sortColumnNames)) =>
+val normalizedBucketCols = bucketColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"bucket")
+}
+checkDuplication(normalizedBucketCols, "bucket")
+
+val normalizedSortCols = sortColumnNames.map { colName =>
+  normalizeColumnName(tableDesc.identifier, schema, colName, 
"sort")
+}
+checkDuplication(normalizedSortCols, "sort")
+
+schema.filter(f => 
normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
+  case dt if RowOrdering.isOrderable(dt) => // OK
+  case other => failAnalysis(s"Cannot use ${other.simpleString} 
for 

[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73466174
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
 val properties = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
 val selectQuery = Option(ctx.query).map(plan)
 
-// Ensuring whether no duplicate name is used in table definition
-val colNames = dataCols.map(_.name)
-if (colNames.length != colNames.distinct.length) {
-  val duplicateColumns = colNames.groupBy(identity).collect {
-case (x, ys) if ys.length > 1 => "\"" + x + "\""
-  }
-  operationNotAllowed(s"Duplicated column names found in table 
definition of $name: " +
-duplicateColumns.mkString("[", ",", "]"), ctx)
-}
-
-// For Hive tables, partition columns must not be part of the schema
--- End diff --

This is specific to Hive tables. Any reason why we remove this checking?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73465089
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---
@@ -19,50 +19,25 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.types._
 
+case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: 
Option[LogicalPlan])
+  extends LogicalPlan {
+  assert(tableDesc.provider.isDefined, "The table to be created must have 
a provider.")
 
-/**
- * Used to represent the operation of create table using a data source.
- *
- * @param allowExisting If it is true, we will do nothing when the table 
already exists.
- *  If it is false, an exception will be thrown
- */
-case class CreateTableUsing(
-tableIdent: TableIdentifier,
-userSpecifiedSchema: Option[StructType],
-provider: String,
-temporary: Boolean,
-options: Map[String, String],
-partitionColumns: Array[String],
-bucketSpec: Option[BucketSpec],
-allowExisting: Boolean,
-managedIfNoPath: Boolean) extends LogicalPlan with logical.Command {
-
-  override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
-}
+  if (query.isEmpty) {
+assert(
+  mode == SaveMode.ErrorIfExists || mode == SaveMode.Ignore,
+  "create table without data insertion can only use ErrorIfExists or 
Ignore as SaveMode.")
+  }
 
-/**
- * A node used to support CTAS statements and saveAsTable for the data 
source API.
- * This node is a [[logical.UnaryNode]] instead of a [[logical.Command]] 
because we want the
- * analyzer can analyze the logical plan that will be used to populate the 
table.
- * So, [[PreWriteCheck]] can detect cases that are not allowed.
- */
-case class CreateTableUsingAsSelect(
-tableIdent: TableIdentifier,
-provider: String,
-partitionColumns: Array[String],
-bucketSpec: Option[BucketSpec],
-mode: SaveMode,
-options: Map[String, String],
-child: LogicalPlan) extends logical.UnaryNode {
   override def output: Seq[Attribute] = Seq.empty[Attribute]
+
+  override def children: Seq[LogicalPlan] = query.toSeq
--- End diff --

This is great! Sometimes, the plan of `query` could be not analyzed at the 
end. This resolves an existing bug. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73464594
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -154,6 +274,21 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
 
   def apply(plan: LogicalPlan): Unit = {
 plan.foreach {
+  case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
+// Since we are saving table metadata to metastore, we should make 
sure the table name
+// and database name don't break some common restrictions, e.g. 
special chars except
+// underscore are not allowed.
+val pattern = Pattern.compile("[\\w_]+")
--- End diff --

cc @hvanhovell , I think this is the only place that we need this check, as 
`CreateTable` is the only plan that can save a table metadata into metastore. 
what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73464308
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -206,22 +206,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
 // The relation in l is not an InsertableRelation.
 failAnalysis(s"$l does not allow insertion.")
 
-  case c: CreateTableUsingAsSelect =>
+  case CreateTable(tableDesc, mode, Some(query)) =>
--- End diff --

Now this rule only checks `if the table is an input table of the query`, it 
won't do anything for hive serde tables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73464228
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some 
normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+// When we CREATE TABLE without specifying the table schema, we should 
fail the query if
+// bucketing information is specified, as we can't infer bucketing 
from data files currently,
+// and we should ignore the partition columns if it's specified, as we 
will infer it later, at
+// runtime.
+case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+  if (tableDesc.bucketSpec.isDefined) {
+failAnalysis("Cannot specify bucketing information if the table 
schema is not specified " +
+  "when creating and will be inferred at runtime")
+  }
+
+  val partitionColumnNames = tableDesc.partitionColumnNames
+  if (partitionColumnNames.nonEmpty) {
+// The table does not have a specified schema, which means that 
the schema will be inferred
+// at runtime. So, we are not expecting partition columns and we 
will discover partitions
+// at runtime. However, if there are specified partition columns, 
we simply ignore them and
+// provide a warning message.
+logWarning(
+  s"Specified partition columns 
(${partitionColumnNames.mkString(",")}) will be " +
+s"ignored. The schema and partition columns of table 
${tableDesc.identifier} will " +
+"be inferred.")
+c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+  } else {
+c
+  }
+
+// Here we normalize partition, bucket and sort column names, w.r.t. 
the case sensitivity
+// config, and do various checks:
+//   * column names in table definition can't be duplicated.
+//   * partition, bucket and sort column names must exist in table 
definition.
+//   * partition, bucket and sort column names can't be duplicated.
+//   * can't use all table columns as partition columns.
+//   * partition columns' type must be AtomicType.
+//   * sort columns' type must be orderable.
--- End diff --

cc @gatorsmile , I think all this checks are general and can be applied to 
hive serde tables too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73464090
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
--- End diff --

no, the `provider` is always defined, see 
https://github.com/apache/spark/pull/14482/files#diff-ea32a127bbe0c2bab24b0bbc8c333982R30


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73461786
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -233,12 +233,11 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
 }
 
 PartitioningUtils.validatePartitionColumn(
-  c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
+  query.schema, tableDesc.partitionColumnNames, 
conf.caseSensitiveAnalysis)
 
 for {
-  spec <- c.bucketSpec
-  sortColumnName <- spec.sortColumnNames
-  sortColumn <- c.child.schema.find(_.name == sortColumnName)
+  spec <- tableDesc.bucketSpec
+  sortColumn <- 
tableDesc.schema.filter(spec.sortColumnNames.contains)
--- End diff --

Below is the logics for bucketing tables. If we do not plan to support Hive 
bucketing tables, maybe we just issue an exception? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73461643
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -233,12 +233,11 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
 }
 
 PartitioningUtils.validatePartitionColumn(
-  c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
+  query.schema, tableDesc.partitionColumnNames, 
conf.caseSensitiveAnalysis)
--- End diff --

`validatePartitionColumn` is for data source tables only. Right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73461508
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -206,22 +206,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, 
catalog: SessionCatalog)
 // The relation in l is not an InsertableRelation.
 failAnalysis(s"$l does not allow insertion.")
 
-  case c: CreateTableUsingAsSelect =>
+  case CreateTable(tableDesc, mode, Some(query)) =>
--- End diff --

Previously, this is only applicable to Data Source tables. After this 
change, this is also applicable to Create Hive Table As Select. Thus, some 
validation might not be right to Hive tables. We have to be careful to check 
them one by one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73458290
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 throw new AnalysisException(s"Table $tableIdent already exists.")
 
   case _ =>
-val cmd =
-  CreateTableUsingAsSelect(
-tableIdent,
-source,
-
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
-getBucketSpec,
-mode,
-extraOptions.toMap,
-df.logicalPlan)
+val tableDesc = CatalogTable(
+  identifier = tableIdent,
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(properties = 
extraOptions.toMap),
+  schema = new StructType,
+  provider = Some(source),
+  partitionColumnNames = partitioningColumns.getOrElse(Nil),
+  bucketSpec = getBucketSpec
+)
+val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
--- End diff --

hmmm, do we have to use `Option` even though the parameter is guaranteed to 
be not null?

For this case, we can't use `Option`, or the behaviour will be changed. 
Previously if `df.logicalPlan` is null, it's a bug and we will throw NPE 
somewhere. If we use `Option` here, then we are silently converting a CTAS to 
CREATE TABLE, which is not expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73457639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -61,6 +64,38 @@ trait CheckAnalysis extends PredicateHelper {
 }
   }
 
+  private def checkColumnNames(tableDesc: CatalogTable): Unit = {
+val colNames = tableDesc.schema.map(_.name)
+val colNamesSet = colNames.toSet
+checkDuplicatedColumnNames(colNames, colNamesSet, "table definition of 
" + tableDesc.identifier)
+
+def requireSubsetOfSchema(subColNames: Seq[String], colType: String): 
Unit = {
+  val subColNamesSet = subColNames.toSet
+  checkDuplicatedColumnNames(subColNames, subColNamesSet, colType)
+  if (!subColNamesSet.subsetOf(colNamesSet)) {
+failAnalysis(s"$colType columns (${subColNames.mkString(", ")}) 
must be a subset of " +
+  s"schema (${colNames.mkString(", ")}) in table 
'${tableDesc.identifier}'")
+  }
+}
+
+// Verify that the provided columns are part of the schema
+requireSubsetOfSchema(tableDesc.partitionColumnNames, "partition")
+
requireSubsetOfSchema(tableDesc.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil),
 "bucket")
+
requireSubsetOfSchema(tableDesc.bucketSpec.map(_.sortColumnNames).getOrElse(Nil),
 "sort")
+  }
+
+  private def checkDuplicatedColumnNames(
+  colNames: Seq[String],
+  colNamesSet: Set[String],
+  colType: String): Unit = {
+if (colNamesSet.size != colNames.length) {
+  val duplicateColumns = colNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => quoteIdentifier(x)
+  }
--- End diff --

we should, but the previous code doesn't consider case sensitivity either, 
we can do it in follow-ups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73457371
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -349,6 +384,27 @@ trait CheckAnalysis extends PredicateHelper {
  |${s.catalogTable.identifier}
""".stripMargin)
 
+  case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
+// Since we are saving table metadata to metastore, we should 
make sure the table name
+// and database name don't break some common restrictions, 
e.g. special chars except
+// underscore are not allowed.
+val pattern = Pattern.compile("[\\w_]+")
+if (!pattern.matcher(tableDesc.identifier.table).matches()) {
+  failAnalysis(s"Table name ${tableDesc.identifier.table} is 
not a valid name for " +
+s"metastore, it only accepts table name containing 
characters, numbers and _.")
+}
+if (tableDesc.identifier.database.isDefined &&
+  
!pattern.matcher(tableDesc.identifier.database.get).matches()) {
+  failAnalysis(s"Database name ${tableDesc.identifier.table} 
is not a valid name for " +
--- End diff --

`${tableDesc.identifier.table}` -> `${tableDesc.identifier.database.get}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73408103
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -61,6 +64,38 @@ trait CheckAnalysis extends PredicateHelper {
 }
   }
 
+  private def checkColumnNames(tableDesc: CatalogTable): Unit = {
+val colNames = tableDesc.schema.map(_.name)
+val colNamesSet = colNames.toSet
+checkDuplicatedColumnNames(colNames, colNamesSet, "table definition of 
" + tableDesc.identifier)
+
+def requireSubsetOfSchema(subColNames: Seq[String], colType: String): 
Unit = {
+  val subColNamesSet = subColNames.toSet
+  checkDuplicatedColumnNames(subColNames, subColNamesSet, colType)
+  if (!subColNamesSet.subsetOf(colNamesSet)) {
+failAnalysis(s"$colType columns (${subColNames.mkString(", ")}) 
must be a subset of " +
+  s"schema (${colNames.mkString(", ")}) in table 
'${tableDesc.identifier}'")
+  }
+}
+
+// Verify that the provided columns are part of the schema
+requireSubsetOfSchema(tableDesc.partitionColumnNames, "partition")
+
requireSubsetOfSchema(tableDesc.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil),
 "bucket")
+
requireSubsetOfSchema(tableDesc.bucketSpec.map(_.sortColumnNames).getOrElse(Nil),
 "sort")
+  }
+
+  private def checkDuplicatedColumnNames(
+  colNames: Seq[String],
+  colNamesSet: Set[String],
+  colType: String): Unit = {
+if (colNamesSet.size != colNames.length) {
+  val duplicateColumns = colNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => quoteIdentifier(x)
+  }
--- End diff --

Do we need to consider the case sensitivity here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73390262
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
--- End diff --

`tableDesc.provider.contains("hive")`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73389058
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
 def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-logWarning(
-  s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is 
deprecated, " +
-s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-ExecutedCommandExec(
-  CreateTempViewUsing(
-c.tableIdent, c.userSpecifiedSchema, replace = true, 
c.provider, c.options)) :: Nil
-
-  case c: CreateTableUsing if !c.temporary =>
+  case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == 
"hive" =>
--- End diff --

Is it possible for the `tableDesc.provider` to be `None`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73388435
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 throw new AnalysisException(s"Table $tableIdent already exists.")
 
   case _ =>
-val cmd =
-  CreateTableUsingAsSelect(
-tableIdent,
-source,
-
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
-getBucketSpec,
-mode,
-extraOptions.toMap,
-df.logicalPlan)
+val tableDesc = CatalogTable(
+  identifier = tableIdent,
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(properties = 
extraOptions.toMap),
+  schema = new StructType,
+  provider = Some(source),
+  partitionColumnNames = partitioningColumns.getOrElse(Nil),
+  bucketSpec = getBucketSpec
+)
+val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
--- End diff --

NIT: we really shouldn't create use the `Some(...)` use `Option(...)` 
instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73387761
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -112,6 +111,8 @@ case class BucketSpec(
  * Note that Hive's metastore also tracks skewed columns. We should 
consider adding that in the
  * future once we have a better understanding of how we want to handle 
skewed columns.
  *
+ * @param provider the name of the data source provider for this table, 
e.g. parquet, json, etc.
+ * Can be None if this table is View, should be "hive" for 
hive serde tables.
--- End diff --

NIT NIT NIT: ```is a View```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73387447
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -349,6 +384,27 @@ trait CheckAnalysis extends PredicateHelper {
  |${s.catalogTable.identifier}
""".stripMargin)
 
+  case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
+// Since we are saving table metadata to metastore, we should 
make sure the table name
+// and database name don't break some common restrictions, 
e.g. special chars except
+// underscore are not allowed.
+val pattern = Pattern.compile("[\\w_]+")
+if (!pattern.matcher(tableDesc.identifier.table).matches()) {
--- End diff --

Put tableDesc.identifier in a val  (saves typing).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14482#discussion_r73387263
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 ---
@@ -349,6 +384,27 @@ trait CheckAnalysis extends PredicateHelper {
  |${s.catalogTable.identifier}
""".stripMargin)
 
+  case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
+// Since we are saving table metadata to metastore, we should 
make sure the table name
+// and database name don't break some common restrictions, 
e.g. special chars except
+// underscore are not allowed.
+val pattern = Pattern.compile("[\\w_]+")
--- End diff --

Move this somewhere we can reuse it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...

2016-08-03 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/14482

[SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS

## What changes were proposed in this pull request?

we have various logical plans for CREATE TABLE and CTAS: 
`CreateTableUsing`, `CreateTableUsingAsSelect`, 
`CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the 
complexity and centralize the error handling.

## How was this patch tested?

existing tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark table

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14482


commit a11d122343189c4ed5b4be7dbbd5f1f8fdf02f57
Author: Wenchen Fan 
Date:   2016-08-03T15:45:35Z

unify logical plans for CREATE TABLE and CTAS




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org