Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r223703615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], + partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog + val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) + val partitionColumns = table.partitionColumnNames + val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) + val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { + generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { + val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => + key.name -> value.toString + }.toMap + PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { + partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) + } + + def generatePartitionSpec( + partitionFilterSpec: Seq[Expression], + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { + val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { + // Resolve the partition attributes + case partitionCol: Attribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( + partitionCol.name, + partitionColumns, + tableIdentifier.quotedString, + resolver) + partitionAttributes(normalizedPartition) + }.transform { + // Cast the partition value to the data type of the corresponding partition attribute + case cmp @ BinaryComparison(partitionAttr, value) + if !partitionAttr.dataType.sameType(value.dataType) => + cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) --- End diff -- The predicates are not actually converted to Hive's partition predicates. If it can't convert the predicates, `getPartitionsByFilter` will call `getAllPartitionsMethod` to fetch all partitions.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org