[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14482 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73643735 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => +val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) +ExecutedCommandExec(cmd) :: Nil + + case CreateTable(tableDesc, mode, None) => val cmd = CreateDataSourceTableCommand( -c.tableIdent, -c.userSpecifiedSchema, -c.provider, -c.options, -c.partitionColumns, -c.bucketSpec, -c.allowExisting, -c.managedIfNoPath) +tableDesc.identifier, +if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None, +tableDesc.provider.get, +tableDesc.storage.properties, +tableDesc.partitionColumnNames.toArray, +tableDesc.bucketSpec, +ignoreIfExists = mode == SaveMode.Ignore, +managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED) --- End diff -- comment added: https://github.com/apache/spark/pull/14482/files#diff-1bb4f7bd5a2656f48bcd3c857167a11bR331 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73638320 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala --- @@ -272,18 +259,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, options: Map[String, String]): DataFrame = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) -val cmd = - CreateTableUsing( -tableIdent, -userSpecifiedSchema = Some(schema), -source, -temporary = false, -options, -partitionColumns = Array.empty[String], -bucketSpec = None, -allowExisting = false, -managedIfNoPath = false) -sparkSession.sessionState.executePlan(cmd).toRdd +val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema, + provider = Some(source) +) +val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) --- End diff -- good catch! yea we should diable hive for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73574347 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala --- @@ -272,18 +259,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, options: Map[String, String]): DataFrame = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) -val cmd = - CreateTableUsing( -tableIdent, -userSpecifiedSchema = Some(schema), -source, -temporary = false, -options, -partitionColumns = Array.empty[String], -bucketSpec = None, -allowExisting = false, -managedIfNoPath = false) -sparkSession.sessionState.executePlan(cmd).toRdd +val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema, + provider = Some(source) +) +val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) --- End diff -- If so, we need to populate the `location` of `storage`. Maybe we just disable the option `hive` and issue an exception when users do it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73568017 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => +val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) +ExecutedCommandExec(cmd) :: Nil + + case CreateTable(tableDesc, mode, None) => val cmd = CreateDataSourceTableCommand( -c.tableIdent, -c.userSpecifiedSchema, -c.provider, -c.options, -c.partitionColumns, -c.bucketSpec, -c.allowExisting, -c.managedIfNoPath) +tableDesc.identifier, +if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None, +tableDesc.provider.get, +tableDesc.storage.properties, +tableDesc.partitionColumnNames.toArray, +tableDesc.bucketSpec, +ignoreIfExists = mode == SaveMode.Ignore, +managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED) --- End diff -- A little bit tricky here, but the change is not a bug. Maybe leave a TODO here? When we reimplementing the write path, we can simplify the logics to determine the table type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73557584 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala --- @@ -272,18 +259,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, options: Map[String, String]): DataFrame = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) -val cmd = - CreateTableUsing( -tableIdent, -userSpecifiedSchema = Some(schema), -source, -temporary = false, -options, -partitionColumns = Array.empty[String], -bucketSpec = None, -allowExisting = false, -managedIfNoPath = false) -sparkSession.sessionState.executePlan(cmd).toRdd +val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema, + provider = Some(source) +) +val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) --- End diff -- This change will have an external impact. The value of source is input from users. That means it could be "hive". That means, users can create a Hive serde Table. If this is desired, we need to document it and test it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73553259 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala --- @@ -223,20 +223,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { tableName: String, source: String, options: Map[String, String]): DataFrame = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) -val cmd = - CreateTableUsing( -tableIdent, -userSpecifiedSchema = None, -source, -temporary = false, -options = options, -partitionColumns = Array.empty[String], -bucketSpec = None, -allowExisting = false, -managedIfNoPath = false) -sparkSession.sessionState.executePlan(cmd).toRdd -sparkSession.table(tableIdent) +createExternalTable(tableName, source, new StructType, options) --- End diff -- This change will have an external impact. The value of `source` is input from users. That means it could be "hive". That means, users can create a Hive serde Table. If this is desired, we need to document it and test it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73478952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText +val schema = Option(ctx.colTypeList()).map(createStructType) val partitionColumnNames = Option(ctx.partitionColumnNames) .map(visitIdentifierList(_).toArray) .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) +val tableDesc = CatalogTable( + identifier = table, + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema.getOrElse(new StructType), + provider = Some(provider), --- End diff -- It was a more general question which you have answered, sorry about that. In this case the provider cannot be null: the grammar requires a provider and the call `ctx.tableProvider.qualifiedName.getText` would result in a NPE if it were null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73478351 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => --- End diff -- Ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73478108 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala --- @@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest { test("create table - location implies external") { val query = "CREATE TABLE my_tab LOCATION '/something/anything'" -val ct = parseAs[CreateTableCommand](query) -assert(ct.table.tableType == CatalogTableType.EXTERNAL) -assert(ct.table.storage.locationUri == Some("/something/anything")) - } - - test("create table - column repeated in partitioning columns") { -val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)" -val e = intercept[ParseException] { parser.parsePlan(query) } -assert(e.getMessage.contains( - "Operation not allowed: Partition columns may not be specified in the schema: [\"key\"]")) - } - - test("create table - duplicate column names in the table definition") { --- End diff -- here too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73478093 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala --- @@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest { test("create table - location implies external") { val query = "CREATE TABLE my_tab LOCATION '/something/anything'" -val ct = parseAs[CreateTableCommand](query) -assert(ct.table.tableType == CatalogTableType.EXTERNAL) -assert(ct.table.storage.locationUri == Some("/something/anything")) - } - - test("create table - column repeated in partitioning columns") { --- End diff -- moved it to `DDLSuite` as it's not a parser error anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73477850 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText +val schema = Option(ctx.colTypeList()).map(createStructType) val partitionColumnNames = Option(ctx.partitionColumnNames) .map(visitIdentifierList(_).toArray) .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) +val tableDesc = CatalogTable( + identifier = table, + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema.getOrElse(new StructType), + provider = Some(provider), --- End diff -- well, just look at the codes around, it's possible that `provider` is null. However, if we also look at the antlr file, the `provider` must be specified. The previous code doesn't check the null either, and will throw NPE somewhere if it's null. So I think we should use `Some` here to follow this behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73477314 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText +val schema = Option(ctx.colTypeList()).map(createStructType) val partitionColumnNames = Option(ctx.partitionColumnNames) .map(visitIdentifierList(_).toArray) .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) +val tableDesc = CatalogTable( + identifier = table, + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema.getOrElse(new StructType), + provider = Some(provider), --- End diff -- yea, for example, https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala#L194 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73476814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => --- End diff -- yea, `provider` can be None in `CreateView`, that's why I add this check in `CreateTable` instead of `CatalogTable`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73475935 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -154,6 +274,21 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) def apply(plan: LogicalPlan): Unit = { plan.foreach { + case c @ CreateTable(tableDesc, mode, query) if c.resolved => +// Since we are saving table metadata to metastore, we should make sure the table name +// and database name don't break some common restrictions, e.g. special chars except +// underscore are not allowed. +val pattern = Pattern.compile("[\\w_]+") --- End diff -- Compiling a pattern can be expensive and the `Pattern` produced is reusable; that is all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73474964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => --- End diff -- Ok, can we just change `CatalogTable.provider` into a `String` then, instead of an `Option[String]`? Or is this public API which we cannot change? If so, it would be nice to add a getter method to the `CatalogTable`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73474573 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -319,12 +319,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText +val schema = Option(ctx.colTypeList()).map(createStructType) val partitionColumnNames = Option(ctx.partitionColumnNames) .map(visitIdentifierList(_).toArray) .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) +val tableDesc = CatalogTable( + identifier = table, + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema.getOrElse(new StructType), + provider = Some(provider), --- End diff -- Is it still possible that a provide is `None`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73474139 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => -val cmd = - CreateTableUsingAsSelect( -tableIdent, -source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), -getBucketSpec, -mode, -extraOptions.toMap, -df.logicalPlan) +val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap), + schema = new StructType, + provider = Some(source), + partitionColumnNames = partitioningColumns.getOrElse(Nil), + bucketSpec = getBucketSpec +) +val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) --- End diff -- I know I am nit picking here :). I have seen a few cases in which some thought the parameter was non-null and used `Some(...)` to wrap that; resulting in a very nice NPE down the line (which you don't expect in an `Option`). In this case you are totally right to use `Some(...)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73472950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- BTW, test added: https://github.com/apache/spark/pull/14482/files#diff-b7094baa12601424a5d19cb930e3402fR144 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73472871 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- `CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT)` for this query, the table definition is `(key int, value string, key int)`, and `PreprocessDDL` will catch and throw exception --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73471840 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. +case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved => + val schema = if (query.isDefined) query.get.schema else tableDesc.schema + checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier) + + val partitionColsChecked = checkPartitionColumns(schema, tableDesc) + val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked) + c.copy(tableDesc = bucketColsChecked) + } + + private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "partition") +} +checkDuplication(normalizedPartitionCols, "partition") + +if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { + failAnalysis("Cannot use all columns for partition columns") +} + +schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { + case _: AtomicType => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column") +} + +tableDesc.copy(partitionColumnNames = normalizedPartitionCols) + } + + private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +tableDesc.bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => +val normalizedBucketCols = bucketColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "bucket") +} +checkDuplication(normalizedBucketCols, "bucket") + +val normalizedSortCols = sortColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "sort") +} +checkDuplication(normalizedSortCols, "sort") + +schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach { + case dt if RowOrdering.isOrderable(dt) => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73471330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- wait. I found a conflict here. For Hive Tables, the following query is not allowed. ``` CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT) ``` However, for data source tables, this query is allowed with a minor change. ``` CREATE TABLE tab1 (key INT, value STRING) USING json PARTITIONED BY (key) ``` Thus, they are different regarding whether partitioning columns should or should not be included in data columns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73471131 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. +case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved => + val schema = if (query.isDefined) query.get.schema else tableDesc.schema + checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier) + + val partitionColsChecked = checkPartitionColumns(schema, tableDesc) + val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked) + c.copy(tableDesc = bucketColsChecked) + } + + private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "partition") +} +checkDuplication(normalizedPartitionCols, "partition") + +if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { + failAnalysis("Cannot use all columns for partition columns") +} + +schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { + case _: AtomicType => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column") +} + +tableDesc.copy(partitionColumnNames = normalizedPartitionCols) + } + + private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +tableDesc.bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => +val normalizedBucketCols = bucketColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "bucket") +} +checkDuplication(normalizedBucketCols, "bucket") + +val normalizedSortCols = sortColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "sort") +} +checkDuplication(normalizedSortCols, "sort") + +schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach { + case dt if RowOrdering.isOrderable(dt) => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73470304 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- uh, I see. I am fine about the new error message. Maybe keep the existing test cases and update the error messages. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73469783 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. +case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved => + val schema = if (query.isDefined) query.get.schema else tableDesc.schema + checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier) + + val partitionColsChecked = checkPartitionColumns(schema, tableDesc) + val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked) + c.copy(tableDesc = bucketColsChecked) + } + + private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "partition") +} +checkDuplication(normalizedPartitionCols, "partition") + +if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { + failAnalysis("Cannot use all columns for partition columns") +} + +schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { + case _: AtomicType => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column") +} + +tableDesc.copy(partitionColumnNames = normalizedPartitionCols) + } + + private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +tableDesc.bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => +val normalizedBucketCols = bucketColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "bucket") +} +checkDuplication(normalizedBucketCols, "bucket") + +val normalizedSortCols = sortColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "sort") +} +checkDuplication(normalizedSortCols, "sort") + +schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach { + case dt if RowOrdering.isOrderable(dt) => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73469639 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- but if you think it in another way, for CREATE TABLE using hive syntax, the table schema is a union of data definition and partition columns. Then this exception is actually `duplicated columns in table schema`, which can be caught by `PreprocessDDL`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73469309 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. --- End diff -- because hive CREATE TABLE syntax is special, it will never hit this exception `partition column names must exist in table definition.` Instead, hive may specify partition column names that exist in table definition, and will hit `column names in table definition can't be duplicated`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73468039 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- `PreprocessDDL` is unable to catch it. `schema` already combines `dataCols` and `partitionCols`. This checking is for detecting the duplicate columns between `dataCols` and `partitionCols` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73467801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- Because the checks in `PreprocessDDL` already covers them, and respect case sensitivity. Although the error message is different, but I think `duplicated columns found in table definition` also makes sense for hive serde tables that define partition column names that already in table schema. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73467821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. --- End diff -- Yeah, these checking are general to both types. The only issue is here: ``` partition, bucket and sort column names must exist in table definition. ``` However, the schema value from the parser combines both. Thus, Hive table creation pass the checks ``` val schema = StructType(dataCols ++ partitionCols) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73467467 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. +case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved => + val schema = if (query.isDefined) query.get.schema else tableDesc.schema + checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier) + + val partitionColsChecked = checkPartitionColumns(schema, tableDesc) + val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked) + c.copy(tableDesc = bucketColsChecked) + } + + private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "partition") +} +checkDuplication(normalizedPartitionCols, "partition") + +if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { + failAnalysis("Cannot use all columns for partition columns") +} + +schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { + case _: AtomicType => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column") +} + +tableDesc.copy(partitionColumnNames = normalizedPartitionCols) + } + + private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +tableDesc.bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => +val normalizedBucketCols = bucketColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "bucket") +} +checkDuplication(normalizedBucketCols, "bucket") + +val normalizedSortCols = sortColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "sort") +} +checkDuplication(normalizedSortCols, "sort") + +schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach { + case dt if RowOrdering.isOrderable(dt) => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73467397 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -206,22 +206,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case c: CreateTableUsingAsSelect => + case CreateTable(tableDesc, mode, Some(query)) => --- End diff -- The exception is CREATE HIVE TABLE AS SELECT. So far, we assume all the unsupported cases have been captured in the DDL handling. Thus, I think it is safe to apply all the cases to Hive Tables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73466961 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. +case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved => + val schema = if (query.isDefined) query.get.schema else tableDesc.schema + checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier) + + val partitionColsChecked = checkPartitionColumns(schema, tableDesc) + val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked) + c.copy(tableDesc = bucketColsChecked) + } + + private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "partition") +} +checkDuplication(normalizedPartitionCols, "partition") + +if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { + failAnalysis("Cannot use all columns for partition columns") +} + +schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { + case _: AtomicType => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column") +} + +tableDesc.copy(partitionColumnNames = normalizedPartitionCols) + } + + private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { +tableDesc.bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => +val normalizedBucketCols = bucketColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "bucket") +} +checkDuplication(normalizedBucketCols, "bucket") + +val normalizedSortCols = sortColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "sort") +} +checkDuplication(normalizedSortCols, "sort") + +schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach { + case dt if RowOrdering.isOrderable(dt) => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73466174 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -933,23 +933,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) -// Ensuring whether no duplicate name is used in table definition -val colNames = dataCols.map(_.name) -if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { -case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + -duplicateColumns.mkString("[", ",", "]"), ctx) -} - -// For Hive tables, partition columns must not be part of the schema --- End diff -- This is specific to Hive tables. Any reason why we remove this checking? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73465089 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala --- @@ -19,50 +19,25 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types._ +case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan]) + extends LogicalPlan { + assert(tableDesc.provider.isDefined, "The table to be created must have a provider.") -/** - * Used to represent the operation of create table using a data source. - * - * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown - */ -case class CreateTableUsing( -tableIdent: TableIdentifier, -userSpecifiedSchema: Option[StructType], -provider: String, -temporary: Boolean, -options: Map[String, String], -partitionColumns: Array[String], -bucketSpec: Option[BucketSpec], -allowExisting: Boolean, -managedIfNoPath: Boolean) extends LogicalPlan with logical.Command { - - override def output: Seq[Attribute] = Seq.empty - override def children: Seq[LogicalPlan] = Seq.empty -} + if (query.isEmpty) { +assert( + mode == SaveMode.ErrorIfExists || mode == SaveMode.Ignore, + "create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.") + } -/** - * A node used to support CTAS statements and saveAsTable for the data source API. - * This node is a [[logical.UnaryNode]] instead of a [[logical.Command]] because we want the - * analyzer can analyze the logical plan that will be used to populate the table. - * So, [[PreWriteCheck]] can detect cases that are not allowed. - */ -case class CreateTableUsingAsSelect( -tableIdent: TableIdentifier, -provider: String, -partitionColumns: Array[String], -bucketSpec: Option[BucketSpec], -mode: SaveMode, -options: Map[String, String], -child: LogicalPlan) extends logical.UnaryNode { override def output: Seq[Attribute] = Seq.empty[Attribute] + + override def children: Seq[LogicalPlan] = query.toSeq --- End diff -- This is great! Sometimes, the plan of `query` could be not analyzed at the end. This resolves an existing bug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73464594 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -154,6 +274,21 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) def apply(plan: LogicalPlan): Unit = { plan.foreach { + case c @ CreateTable(tableDesc, mode, query) if c.resolved => +// Since we are saving table metadata to metastore, we should make sure the table name +// and database name don't break some common restrictions, e.g. special chars except +// underscore are not allowed. +val pattern = Pattern.compile("[\\w_]+") --- End diff -- cc @hvanhovell , I think this is the only place that we need this check, as `CreateTable` is the only plan that can save a table metadata into metastore. what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73464308 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -206,22 +206,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case c: CreateTableUsingAsSelect => + case CreateTable(tableDesc, mode, Some(query)) => --- End diff -- Now this rule only checks `if the table is an input table of the query`, it won't do anything for hive serde tables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73464228 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -62,6 +66,122 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } /** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// When we CREATE TABLE without specifying the table schema, we should fail the query if +// bucketing information is specified, as we can't infer bucketing from data files currently, +// and we should ignore the partition columns if it's specified, as we will infer it later, at +// runtime. +case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { +failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { +// The table does not have a specified schema, which means that the schema will be inferred +// at runtime. So, we are not expecting partition columns and we will discover partitions +// at runtime. However, if there are specified partition columns, we simply ignore them and +// provide a warning message. +logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + +s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + +"be inferred.") +c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { +c + } + +// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity +// config, and do various checks: +// * column names in table definition can't be duplicated. +// * partition, bucket and sort column names must exist in table definition. +// * partition, bucket and sort column names can't be duplicated. +// * can't use all table columns as partition columns. +// * partition columns' type must be AtomicType. +// * sort columns' type must be orderable. --- End diff -- cc @gatorsmile , I think all this checks are general and can be applied to hive serde tables too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73464090 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => --- End diff -- no, the `provider` is always defined, see https://github.com/apache/spark/pull/14482/files#diff-ea32a127bbe0c2bab24b0bbc8c333982R30 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73461786 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -233,12 +233,11 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } PartitioningUtils.validatePartitionColumn( - c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) + query.schema, tableDesc.partitionColumnNames, conf.caseSensitiveAnalysis) for { - spec <- c.bucketSpec - sortColumnName <- spec.sortColumnNames - sortColumn <- c.child.schema.find(_.name == sortColumnName) + spec <- tableDesc.bucketSpec + sortColumn <- tableDesc.schema.filter(spec.sortColumnNames.contains) --- End diff -- Below is the logics for bucketing tables. If we do not plan to support Hive bucketing tables, maybe we just issue an exception? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73461643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -233,12 +233,11 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } PartitioningUtils.validatePartitionColumn( - c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) + query.schema, tableDesc.partitionColumnNames, conf.caseSensitiveAnalysis) --- End diff -- `validatePartitionColumn` is for data source tables only. Right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73461508 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala --- @@ -206,22 +206,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case c: CreateTableUsingAsSelect => + case CreateTable(tableDesc, mode, Some(query)) => --- End diff -- Previously, this is only applicable to Data Source tables. After this change, this is also applicable to Create Hive Table As Select. Thus, some validation might not be right to Hive tables. We have to be careful to check them one by one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73458290 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => -val cmd = - CreateTableUsingAsSelect( -tableIdent, -source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), -getBucketSpec, -mode, -extraOptions.toMap, -df.logicalPlan) +val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap), + schema = new StructType, + provider = Some(source), + partitionColumnNames = partitioningColumns.getOrElse(Nil), + bucketSpec = getBucketSpec +) +val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) --- End diff -- hmmm, do we have to use `Option` even though the parameter is guaranteed to be not null? For this case, we can't use `Option`, or the behaviour will be changed. Previously if `df.logicalPlan` is null, it's a bug and we will throw NPE somewhere. If we use `Option` here, then we are silently converting a CTAS to CREATE TABLE, which is not expected. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73457639 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -61,6 +64,38 @@ trait CheckAnalysis extends PredicateHelper { } } + private def checkColumnNames(tableDesc: CatalogTable): Unit = { +val colNames = tableDesc.schema.map(_.name) +val colNamesSet = colNames.toSet +checkDuplicatedColumnNames(colNames, colNamesSet, "table definition of " + tableDesc.identifier) + +def requireSubsetOfSchema(subColNames: Seq[String], colType: String): Unit = { + val subColNamesSet = subColNames.toSet + checkDuplicatedColumnNames(subColNames, subColNamesSet, colType) + if (!subColNamesSet.subsetOf(colNamesSet)) { +failAnalysis(s"$colType columns (${subColNames.mkString(", ")}) must be a subset of " + + s"schema (${colNames.mkString(", ")}) in table '${tableDesc.identifier}'") + } +} + +// Verify that the provided columns are part of the schema +requireSubsetOfSchema(tableDesc.partitionColumnNames, "partition") + requireSubsetOfSchema(tableDesc.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") + requireSubsetOfSchema(tableDesc.bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") + } + + private def checkDuplicatedColumnNames( + colNames: Seq[String], + colNamesSet: Set[String], + colType: String): Unit = { +if (colNamesSet.size != colNames.length) { + val duplicateColumns = colNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => quoteIdentifier(x) + } --- End diff -- we should, but the previous code doesn't consider case sensitivity either, we can do it in follow-ups. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73457371 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -349,6 +384,27 @@ trait CheckAnalysis extends PredicateHelper { |${s.catalogTable.identifier} """.stripMargin) + case c @ CreateTable(tableDesc, mode, query) if c.resolved => +// Since we are saving table metadata to metastore, we should make sure the table name +// and database name don't break some common restrictions, e.g. special chars except +// underscore are not allowed. +val pattern = Pattern.compile("[\\w_]+") +if (!pattern.matcher(tableDesc.identifier.table).matches()) { + failAnalysis(s"Table name ${tableDesc.identifier.table} is not a valid name for " + +s"metastore, it only accepts table name containing characters, numbers and _.") +} +if (tableDesc.identifier.database.isDefined && + !pattern.matcher(tableDesc.identifier.database.get).matches()) { + failAnalysis(s"Database name ${tableDesc.identifier.table} is not a valid name for " + --- End diff -- `${tableDesc.identifier.table}` -> `${tableDesc.identifier.database.get}` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73408103 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -61,6 +64,38 @@ trait CheckAnalysis extends PredicateHelper { } } + private def checkColumnNames(tableDesc: CatalogTable): Unit = { +val colNames = tableDesc.schema.map(_.name) +val colNamesSet = colNames.toSet +checkDuplicatedColumnNames(colNames, colNamesSet, "table definition of " + tableDesc.identifier) + +def requireSubsetOfSchema(subColNames: Seq[String], colType: String): Unit = { + val subColNamesSet = subColNames.toSet + checkDuplicatedColumnNames(subColNames, subColNamesSet, colType) + if (!subColNamesSet.subsetOf(colNamesSet)) { +failAnalysis(s"$colType columns (${subColNames.mkString(", ")}) must be a subset of " + + s"schema (${colNames.mkString(", ")}) in table '${tableDesc.identifier}'") + } +} + +// Verify that the provided columns are part of the schema +requireSubsetOfSchema(tableDesc.partitionColumnNames, "partition") + requireSubsetOfSchema(tableDesc.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") + requireSubsetOfSchema(tableDesc.bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") + } + + private def checkDuplicatedColumnNames( + colNames: Seq[String], + colNamesSet: Set[String], + colType: String): Unit = { +if (colNamesSet.size != colNames.length) { + val duplicateColumns = colNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => quoteIdentifier(x) + } --- End diff -- Do we need to consider the case sensitivity here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73390262 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => --- End diff -- `tableDesc.provider.contains("hive")`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73389058 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => -logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + -s"please use CREATE TEMPORARY VIEW viewName USING... instead") -ExecutedCommandExec( - CreateTempViewUsing( -c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => --- End diff -- Is it possible for the `tableDesc.provider` to be `None`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73388435 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => -val cmd = - CreateTableUsingAsSelect( -tableIdent, -source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), -getBucketSpec, -mode, -extraOptions.toMap, -df.logicalPlan) +val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap), + schema = new StructType, + provider = Some(source), + partitionColumnNames = partitioningColumns.getOrElse(Nil), + bucketSpec = getBucketSpec +) +val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) --- End diff -- NIT: we really shouldn't create use the `Some(...)` use `Option(...)` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73387761 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -112,6 +111,8 @@ case class BucketSpec( * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the * future once we have a better understanding of how we want to handle skewed columns. * + * @param provider the name of the data source provider for this table, e.g. parquet, json, etc. + * Can be None if this table is View, should be "hive" for hive serde tables. --- End diff -- NIT NIT NIT: ```is a View``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73387447 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -349,6 +384,27 @@ trait CheckAnalysis extends PredicateHelper { |${s.catalogTable.identifier} """.stripMargin) + case c @ CreateTable(tableDesc, mode, query) if c.resolved => +// Since we are saving table metadata to metastore, we should make sure the table name +// and database name don't break some common restrictions, e.g. special chars except +// underscore are not allowed. +val pattern = Pattern.compile("[\\w_]+") +if (!pattern.matcher(tableDesc.identifier.table).matches()) { --- End diff -- Put tableDesc.identifier in a val (saves typing). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14482#discussion_r73387263 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -349,6 +384,27 @@ trait CheckAnalysis extends PredicateHelper { |${s.catalogTable.identifier} """.stripMargin) + case c @ CreateTable(tableDesc, mode, query) if c.resolved => +// Since we are saving table metadata to metastore, we should make sure the table name +// and database name don't break some common restrictions, e.g. special chars except +// underscore are not allowed. +val pattern = Pattern.compile("[\\w_]+") --- End diff -- Move this somewhere we can reuse it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14482: [SPARK-16879][SQL] unify logical plans for CREATE...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/14482 [SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS ## What changes were proposed in this pull request? we have various logical plans for CREATE TABLE and CTAS: `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the complexity and centralize the error handling. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark table Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14482.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14482 commit a11d122343189c4ed5b4be7dbbd5f1f8fdf02f57 Author: Wenchen FanDate: 2016-08-03T15:45:35Z unify logical plans for CREATE TABLE and CTAS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org