[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225113719 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( --- End diff -- Yes, this is my understanding. You can check `DDLTaks.dropPartitions`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225108214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( --- End diff -- So the implementation here is similar to how hive implements it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225101976 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( --- End diff -- unfortunately, no. I checked https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java but I could find none. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225101681 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute(name: String) + extends Attribute with Unevaluable { + override val exprId: ExprId = NamedExpression.newExprId + // Not really needed and used. We just need a dataType to be used during analysis for resolving + // the expressions. The String type is used because all the literals in PARTITION operations are + // parsed as strings and eventually casted later. + override def dataType: DataType = StringType --- End diff -- Probably I should improve the comment then. it's misleading: this is actually needed because otherwise we may hit exceptions since the `dataType` is checked when running `checkInputDataTypes` of the comparison operator containing it. I'll improve the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225074416 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,31 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { --- End diff -- nit: can we move `withOrigin(ctx)` here? i.e. ``` def xxx(): T = withOrigin(ctx) { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225073862 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute(name: String) + extends Attribute with Unevaluable { + override val exprId: ExprId = NamedExpression.newExprId --- End diff -- even it's a fake attribute, we should not change the `exprId` when this expression gets copied. Can we move `exprId` to the constructor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225074108 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute(name: String) + extends Attribute with Unevaluable { + override val exprId: ExprId = NamedExpression.newExprId + // Not really needed and used. We just need a dataType to be used during analysis for resolving + // the expressions. The String type is used because all the literals in PARTITION operations are + // parsed as strings and eventually casted later. + override def dataType: DataType = StringType --- End diff -- If it's not needed, can we throw exception here? We may need to override `toString` though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225076055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( --- End diff -- does hive have an API to drop partitions with a predicate? I think the current approach is very inefficient with non-equal partition predicates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r224082261 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( --- End diff -- sure, I will. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223913441 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( --- End diff -- But it's also weird to use `AttributeReference` this way. Can we create a new `Attribute` implementation for this purpose? Basically we only need a resolved expression to hold the partition column name. The type doesn't matter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223758511 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( --- End diff -- I thought about that. The point is that we have anyway to check that the attributes specified are the partitioning ones. So I am not sure it is worth to run the whole analyzer rules for something we have anyway to handle somehow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223753508 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( --- End diff -- Shall we make table relation as a child? then we can resolve the `partitionsFilters` automatically. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223710579 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { +partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) + } + + def generatePartitionSpec( + partitionFilterSpec: Seq[Expression], + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { +val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { +// Resolve the partition attributes +case partitionCol: Attribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( +partitionCol.name, +partitionColumns, +tableIdentifier.quotedString, +resolver) + partitionAttributes(normalizedPartition) + }.transform { +// Cast the partition value to the data type of the corresponding partition attribute +case cmp @ BinaryComparison(partitionAttr, value) +if !partitionAttr.dataType.sameType(value.dataType) => + cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) --- End diff -- ah nice catch, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223703615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { +partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) + } + + def generatePartitionSpec( + partitionFilterSpec: Seq[Expression], + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { +val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { +// Resolve the partition attributes +case partitionCol: Attribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( +partitionCol.name, +partitionColumns, +tableIdentifier.quotedString, +resolver) + partitionAttributes(normalizedPartition) + }.transform { +// Cast the partition value to the data type of the corresponding partition attribute +case cmp @ BinaryComparison(partitionAttr, value) +if !partitionAttr.dataType.sameType(value.dataType) => + cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) --- End diff -- The predicates are not actually converted to Hive's partition predicates. If it can't convert the predicates, `getPartitionsByFilter` will call `getAllPartitionsMethod` to fetch all partitions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223626350 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { +partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) + } + + def generatePartitionSpec( + partitionFilterSpec: Seq[Expression], + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { +val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { +// Resolve the partition attributes +case partitionCol: Attribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( +partitionCol.name, +partitionColumns, +tableIdentifier.quotedString, +resolver) + partitionAttributes(normalizedPartition) + }.transform { +// Cast the partition value to the data type of the corresponding partition attribute +case cmp @ BinaryComparison(partitionAttr, value) +if !partitionAttr.dataType.sameType(value.dataType) => + cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) --- End diff -- Yes, please see the tests here: https://github.com/apache/spark/pull/20999/files/a964d2a7def5aed04bd362b3000b36583c0ba272#diff-b7094baa12601424a5d19cb930e3402fR663. Notice that value is always a `string` so in all cases with different datatypes we are using the cast. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223414695 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -1015,6 +1036,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val left = expression(ctx.left) val right = expression(ctx.right) val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] +buildComparison(left, right, operator) + } + + /** + * Creates a comparison expression. The following comparison operators are supported: + * - Equal: '=' or '==' + * - Null-safe Equal: '<=>' --- End diff -- Seems we can't support null-safe equality because it is not supported by Hive metastore partition predicate pushdown. See HiveShim.scala. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223415516 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { +partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) + } + + def generatePartitionSpec( + partitionFilterSpec: Seq[Expression], + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { +val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { +// Resolve the partition attributes +case partitionCol: Attribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( +partitionCol.name, +partitionColumns, +tableIdentifier.quotedString, +resolver) + partitionAttributes(normalizedPartition) + }.transform { +// Cast the partition value to the data type of the corresponding partition attribute +case cmp @ BinaryComparison(partitionAttr, value) +if !partitionAttr.dataType.sameType(value.dataType) => + cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) --- End diff -- hmm, have you tested `Cast` cases? I look at `convertFilters` method in HiveShim.scala, and seems we don't convert `Cast` in the pushdown predicates to Hive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216593044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- sure, thanks @maropu. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216525719 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- Ya, looks good to me. But, I'm not sure which one is the right approach, so we'd be better to wait for other reviewer's comments here, too. cc: @gatorsmile @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216380067 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- oh, now I see, sorry. What about then having a `Seq[Filter]` instead? In order to avoid the `splitDisjunctivePredicates`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216322634 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- yea, if you put `Expression` in the class fields of `AlterTableDropPartitionCommand` (https://github.com/apache/spark/pull/20999/files#diff-54979ed5797b4a6193cf663dc23baca5R524), it fails in `Analyzer.executeAndCheck`. But, if we put `LogicalPlan` in the class fields, IIUC it doesn't fail there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216307940 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() +val value = Literal(visitStringConstant(pFilter.constant())) --- End diff -- ok, thanks for the check. It's ok to keep the current one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216305161 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -261,6 +261,14 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec +: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' +; + +dropPartitionVal +: identifier (comparisonOperator constant)? --- End diff -- thanks for the check. I still like meaningful messages though, we shold wait for other reviewer's comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216304811 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- I may be missing something here, so sorry if I am not understanding something, but I think the issue is that the analyzer is called anyway before the `AlterTableDropPartitionCommand.run` command and it fails because of the unresolved attributes. Moreover, in your code I don't see it being part neither of `children` nor of `innerChildren`. I think the alternative here is to add a rule to the analyzer for this, but it seems an overkill to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216299331 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- yea, so I called the analyzer inside`AlterTableDropPartitionCommand.run`. https://github.com/apache/spark/commit/52506f1ebfb36dfaf0380a58cb68ee6aa5225de4#diff-54979ed5797b4a6193cf663dc23baca5R539 Or, if you want to resolve the plan in the analyzing phase, how about just add it in `children` instread of `innerChildren`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216227938 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- Your approach has the same issue, ie. would fail because the `LogicalPlan` you added is not resolved after analysis. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216174690 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- For example, how about this approach? (you tried already?) https://github.com/apache/spark/commit/52506f1ebfb36dfaf0380a58cb68ee6aa5225de4 It added unresolved a logical plan (an input relation and filters) for `AlterTableDropPartitionCommand`, then resolved the plan in `AlterTableDropPartitionCommand.run` and computed partition specs based on the resolved expressions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216162541 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -261,6 +261,14 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec +: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' +; + +dropPartitionVal +: identifier (comparisonOperator constant)? --- End diff -- Hive throws this parser exception: ``` hive> alter table test1 drop partition(1 > c); NoViableAltException(368@[]) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:12014) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.dropPartitionVal(HiveParser_IdentifiersParser.java:11684) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.dropPartitionSpec(HiveParser_IdentifiersParser.java:11563) at org.apache.hadoop.hive.ql.parse.HiveParser.dropPartitionSpec(HiveParser.java:44851) at org.apache.hadoop.hive.ql.parse.HiveParser.alterStatementSuffixDropPartitions(HiveParser.java:11564) at org.apache.hadoop.hive.ql.parse.HiveParser.alterTableStatementSuffix(HiveParser.java:8000) at org.apache.hadoop.hive.ql.parse.HiveParser.alterStatement(HiveParser.java:7450) at org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:4340) at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:2497) at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1423) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:209) at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:74) at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:67) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:615) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1829) at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1776) at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1771) at org.apache.hadoop.hive.ql.reexec.ReExecDriver.compileAndRespond(ReExecDriver.java:126) at org.apache.hadoop.hive.ql.reexec.ReExecDriver.run(ReExecDriver.java:214) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:239) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:188) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:402) at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:832) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:770) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:694) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) FAILED: ParseException line 1:33 cannot recognize input near '1' '>' 'c' in drop partition statement ``` so yes, it is analogous to this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216157835 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -927,7 +927,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec), --- End diff -- mmmh, I am not sure how to update it. The only difference is that `specN` now can be any kind of filter instead of just `partitionColumn = value`. So it is actually the definition of specN which changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216131276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -927,7 +927,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec), --- End diff -- Can you update the comment?: https://github.com/apache/spark/blob/01c3dfab158d40653f8ce5d96f57220297545d5b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala#L916 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216128234 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- Let me have more time to check this behaviour. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216127950 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -261,6 +261,14 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec +: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' +; + +dropPartitionVal +: identifier (comparisonOperator constant)? --- End diff -- yea, yes. I like user-understandable error messages. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216127931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ --- End diff -- I just wanted to check if your IDE wrongly folded this import, or not. It's ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216127734 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || --- End diff -- I saw no null check in the other partition spec, then I though so; https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala#L274 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126194 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -261,6 +261,14 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec +: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' +; + +dropPartitionVal +: identifier (comparisonOperator constant)? --- End diff -- Hive does throw an error in that case, you mean asking that error is a parsing or another kind of exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126249 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) --- End diff -- sure, will do ASAP, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126480 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala --- @@ -861,7 +861,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assertUnsupported(sql2_view) val tableIdent = TableIdentifier("table_name", None) -val expected1_table = AlterTableDropPartitionCommand( + +val expected1_table = AlterTableDropPartitionCommand.fromSpecs( --- End diff -- sure, will do, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126332 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() +val value = Literal(visitStringConstant(pFilter.constant())) --- End diff -- thanks, will update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126453 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -1015,6 +1037,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val left = expression(ctx.left) val right = expression(ctx.right) val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] +buildComparison(left, right, operator) + } + + /** + * Creates a comparison expression. The following comparison operators are supported: + * - Equal: '=' or '==' + * - Null-safe Equal: '<=>' + * - Not Equal: '<>' or '!=' + * - Less than: '<' + * - Less then or Equal: '<=' + * - Greater than: '>' + * - Greater then or Equal: '>=' + */ --- End diff -- yes, it does --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126471 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ --- End diff -- not sure what you mean here. The list of imports would be very long, as I use, EqualTo, And, Literal, Cast, BinaryComparison, etc. I can list all them, but I am not sure it is worth. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126217 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || --- End diff -- I am not sure. The other 2 conditions can definitely be true, but I am not sure about this. I think it is safer to check it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126491 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -577,6 +578,143 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value1: Any, value2: Any): Unit = { +withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), +Row(s"p=$value1") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) +} + } + + test("SPARK-14922: Drop partitions by filter") { +withTable("sales") { + sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") + for (country <- Seq("AU", "US", "CA", "KR")) { +for (quarter <- 1 to 5) { + sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") +} + } + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=AU/quarter=1") :: + Row("country=AU/quarter=2") :: + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=KR/quarter=5") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: + Row("country=US/quarter=5") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = '4')") + sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=KR/quarter=3") :: Nil) + // According to the declarative partition spec definitions, this drops the union of target + // partitions without exceptions. Hive raises exceptions because it handles them sequentially. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), Nil) +} +withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), +Row(s"p=false") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) +} +testDropPartition(IntegerType, 1, 2) +testDropPartition(BooleanType, false, true) +testDropPartition(LongType, 1L, 2L) +testDropPartition(ShortType, 1.toShort, 2.toShort) +testDropPartition(ByteType, 1.toByte, 2.toByte) +testDropPartition(FloatType, 1.0F, 2.0F) +
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216126328 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- Well, the answer is in `def expressions` in `QueryPlan`. In #19691, we end up with a `Seq[(TablePartitionSpec, Seq[Expression])]`, so the expressions there are not recognized/considered by the `Analyzer`. In this PR we have `Seq[Expression]` (which is way cleaner IMHO and address comment https://github.com/apache/spark/pull/19691/files#r193002268), so these expressions are considered by the Analyzer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216124388 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -1015,6 +1037,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val left = expression(ctx.left) val right = expression(ctx.right) val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] +buildComparison(left, right, operator) + } + + /** + * Creates a comparison expression. The following comparison operators are supported: + * - Equal: '=' or '==' + * - Null-safe Equal: '<=>' + * - Not Equal: '<>' or '!=' + * - Less than: '<' + * - Less then or Equal: '<=' + * - Greater than: '>' + * - Greater then or Equal: '>=' + */ --- End diff -- Hive also supports all the comparators above? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216124365 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() --- End diff -- Looks weird and why can we use `UnresolvedAttribute` in #19691? https://github.com/apache/spark/pull/19691/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R293 (Sorry, but probably I miss something?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216124252 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) +} +// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when +// running the command. The type is not relevant, it is replaced during the real resolution +val partition = + AttributeReference(pFilter.identifier().getText, StringType)() +val value = Literal(visitStringConstant(pFilter.constant())) --- End diff -- `val value = Literal(visitStringConstant(pFilter.constant()), StringType)` for better readablilty? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216124163 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || --- End diff -- no chance `pFilter.identifier() == null`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216124059 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { +withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => +if (pFilter.identifier() == null || pFilter.constant() == null || +pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) --- End diff -- Can you add tests for this exception in `DDLParserSuite.scala`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216123948 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ --- End diff -- too many imports? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216123701 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala --- @@ -861,7 +861,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assertUnsupported(sql2_view) val tableIdent = TableIdentifier("table_name", None) -val expected1_table = AlterTableDropPartitionCommand( + +val expected1_table = AlterTableDropPartitionCommand.fromSpecs( --- End diff -- Can you add tests case to check if the parser can accept the comparators added by this pr? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216122595 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -577,6 +578,143 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value1: Any, value2: Any): Unit = { +withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), +Row(s"p=$value1") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) +} + } + + test("SPARK-14922: Drop partitions by filter") { +withTable("sales") { + sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") + for (country <- Seq("AU", "US", "CA", "KR")) { +for (quarter <- 1 to 5) { + sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") +} + } + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=AU/quarter=1") :: + Row("country=AU/quarter=2") :: + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=KR/quarter=5") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: + Row("country=US/quarter=5") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = '4')") + sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')") + checkAnswer(sql("SHOW PARTITIONS sales"), +Row("country=KR/quarter=3") :: Nil) + // According to the declarative partition spec definitions, this drops the union of target + // partitions without exceptions. Hive raises exceptions because it handles them sequentially. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), Nil) +} +withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), +Row(s"p=false") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) +} +testDropPartition(IntegerType, 1, 2) +testDropPartition(BooleanType, false, true) +testDropPartition(LongType, 1L, 2L) +testDropPartition(ShortType, 1.toShort, 2.toShort) +testDropPartition(ByteType, 1.toByte, 2.toByte) +testDropPartition(FloatType, 1.0F, 2.0F) +
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r216122503 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -261,6 +261,14 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec +: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' +; + +dropPartitionVal +: identifier (comparisonOperator constant)? --- End diff -- Hive also throws antler errors for the case `2 > partCol1`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org