This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 02c1a867d8e0 [HUDI-9726] Support for generic filters for spark
procedures (#13736)
02c1a867d8e0 is described below
commit 02c1a867d8e034b99bd01c00c5d1e34ede060e17
Author: Vamshi Krishna Kyatham
<[email protected]>
AuthorDate: Fri Aug 22 16:57:32 2025 -0700
[HUDI-9726] Support for generic filters for spark procedures (#13736)
---
.../procedures/HoodieProcedureFilterUtils.scala | 493 +++++++++++++++++++++
.../procedures/ShowCleansPlanProcedure.scala | 120 ++++-
.../command/procedures/ShowCleansProcedure.scala | 93 +++-
.../hudi/procedure/TestShowCleansProcedures.scala | 137 ++++++
4 files changed, 839 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureFilterUtils.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureFilterUtils.scala
new file mode 100644
index 000000000000..533c80b92777
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureFilterUtils.scala
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Utility object for filtering procedure results using SQL expressions.
+ *
+ * Supports all Spark SQL data types including:
+ * - Primitive types: Boolean, Byte, Short, Int, Long, Float, Double, String,
Binary
+ * - Date/Time types: Date, Timestamp, Instant, LocalDate, LocalDateTime
+ * - Decimal types: BigDecimal with precision/scale
+ * - Complex types: Array, Map, Struct (Row)
+ * - Nested combinations of all above types
+ */
+object HoodieProcedureFilterUtils {
+
+ /**
+ * Evaluates a SQL filter expression against a sequence of rows.
+ *
+ * @param rows The rows to filter
+ * @param filterExpression SQL expression string
+ * @param schema The schema of the rows
+ * @param sparkSession Spark session for expression parsing
+ * @return Filtered rows that match the expression
+ */
+ def evaluateFilter(rows: Seq[Row], filterExpression: String, schema:
StructType, sparkSession: SparkSession): Seq[Row] = {
+
+ if (filterExpression == null || filterExpression.trim.isEmpty) {
+ rows
+ } else {
+ Try {
+ val parsedExpr =
sparkSession.sessionState.sqlParser.parseExpression(filterExpression)
+
+ rows.filter { row =>
+ evaluateExpressionOnRow(parsedExpr, row, schema)
+ }
+ } match {
+ case Success(filteredRows) => filteredRows
+ case Failure(exception) =>
+ throw new IllegalArgumentException(
+ s"Failed to parse or evaluate filter expression
'$filterExpression': ${exception.getMessage}",
+ exception
+ )
+ }
+ }
+ }
+
+ private def evaluateExpressionOnRow(expression: Expression, row: Row,
schema: StructType): Boolean = {
+
+ val internalRow = convertRowToInternalRow(row, schema)
+
+ Try {
+ // First pass: bind attributes
+ val attributeBound = expression.transform {
+ case attr: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
=>
+ try {
+ val fieldIndex = schema.fieldIndex(attr.name)
+ val field = schema.fields(fieldIndex)
+
org.apache.spark.sql.catalyst.expressions.BoundReference(fieldIndex,
field.dataType, field.nullable)
+ } catch {
+ case _: IllegalArgumentException => attr
+ }
+ }
+
+ // Second pass: resolve functions
+ val functionResolved = attributeBound.transform {
+ case unresolvedFunc:
org.apache.spark.sql.catalyst.analysis.UnresolvedFunction =>
+ unresolvedFunc.nameParts.head.toLowerCase match {
+ case "upper" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Upper(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "lower" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Lower(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "length" | "len" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Length(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "trim" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.StringTrim(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "ltrim" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.StringTrimLeft(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "rtrim" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.StringTrimRight(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "substring" | "substr" =>
+ if (unresolvedFunc.arguments.length == 3) {
+ org.apache.spark.sql.catalyst.expressions.Substring(
+ unresolvedFunc.arguments(0),
+ unresolvedFunc.arguments(1),
+ unresolvedFunc.arguments(2)
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "abs" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Abs(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "round" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Round(unresolvedFunc.arguments.head,
org.apache.spark.sql.catalyst.expressions.Literal(0))
+ } else if (unresolvedFunc.arguments.length == 2) {
+
org.apache.spark.sql.catalyst.expressions.Round(unresolvedFunc.arguments(0),
unresolvedFunc.arguments(1))
+ } else {
+ unresolvedFunc
+ }
+ case "ceil" | "ceiling" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Ceil(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "floor" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Floor(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "year" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Year(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "month" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Month(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "day" | "dayofmonth" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.DayOfMonth(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "hour" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Hour(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "size" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Size(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "map_keys" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.MapKeys(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "map_values" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.MapValues(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "array_contains" =>
+ if (unresolvedFunc.arguments.length == 2) {
+ org.apache.spark.sql.catalyst.expressions.ArrayContains(
+ unresolvedFunc.arguments.head,
+ unresolvedFunc.arguments(1)
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "array_size" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.Size(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "sort_array" =>
+ if (unresolvedFunc.arguments.length == 1) {
+ org.apache.spark.sql.catalyst.expressions.SortArray(
+ unresolvedFunc.arguments.head,
+ org.apache.spark.sql.catalyst.expressions.Literal(true)
+ )
+ } else if (unresolvedFunc.arguments.length == 2) {
+ org.apache.spark.sql.catalyst.expressions.SortArray(
+ unresolvedFunc.arguments.head,
+ unresolvedFunc.arguments(1)
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "like" =>
+ if (unresolvedFunc.arguments.length == 2) {
+ org.apache.spark.sql.catalyst.expressions.Like(
+ unresolvedFunc.arguments.head,
+ unresolvedFunc.arguments(1),
+ '\\'
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "rlike" | "regexp_like" =>
+ if (unresolvedFunc.arguments.length == 2) {
+ org.apache.spark.sql.catalyst.expressions.RLike(
+ unresolvedFunc.arguments.head,
+ unresolvedFunc.arguments(1)
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "regexp_extract" =>
+ if (unresolvedFunc.arguments.length == 3) {
+ org.apache.spark.sql.catalyst.expressions.RegExpExtract(
+ unresolvedFunc.arguments.head,
+ unresolvedFunc.arguments(1),
+ unresolvedFunc.arguments(2)
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "date_format" =>
+ if (unresolvedFunc.arguments.length == 2) {
+ org.apache.spark.sql.catalyst.expressions.DateFormatClass(
+ unresolvedFunc.arguments.head,
+ unresolvedFunc.arguments(1)
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "datediff" =>
+ if (unresolvedFunc.arguments.length == 2) {
+ org.apache.spark.sql.catalyst.expressions.DateDiff(
+ unresolvedFunc.arguments.head,
+ unresolvedFunc.arguments(1)
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "isnull" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.IsNull(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "isnotnull" =>
+ if (unresolvedFunc.arguments.length == 1) {
+
org.apache.spark.sql.catalyst.expressions.IsNotNull(unresolvedFunc.arguments.head)
+ } else {
+ unresolvedFunc
+ }
+ case "coalesce" =>
+ if (unresolvedFunc.arguments.nonEmpty) {
+
org.apache.spark.sql.catalyst.expressions.Coalesce(unresolvedFunc.arguments)
+ } else {
+ unresolvedFunc
+ }
+ case "string" =>
+ if (unresolvedFunc.arguments.length == 1) {
+ org.apache.spark.sql.catalyst.expressions.Cast(
+ unresolvedFunc.arguments.head,
+ org.apache.spark.sql.types.StringType
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "int" | "integer" =>
+ if (unresolvedFunc.arguments.length == 1) {
+ org.apache.spark.sql.catalyst.expressions.Cast(
+ unresolvedFunc.arguments.head,
+ org.apache.spark.sql.types.IntegerType
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "long" | "bigint" =>
+ if (unresolvedFunc.arguments.length == 1) {
+ org.apache.spark.sql.catalyst.expressions.Cast(
+ unresolvedFunc.arguments.head,
+ org.apache.spark.sql.types.LongType
+ )
+ } else {
+ unresolvedFunc
+ }
+ case "double" =>
+ if (unresolvedFunc.arguments.length == 1) {
+ org.apache.spark.sql.catalyst.expressions.Cast(
+ unresolvedFunc.arguments.head,
+ org.apache.spark.sql.types.DoubleType
+ )
+ } else {
+ unresolvedFunc
+ }
+ case _ => unresolvedFunc
+ }
+ }
+
+ // Third pass: handle type coercion for numeric comparisons
+ val boundExpr = functionResolved.transformUp {
+ case eq: org.apache.spark.sql.catalyst.expressions.EqualTo =>
+ applyTypeCoercion(eq.left, eq.right,
org.apache.spark.sql.catalyst.expressions.EqualTo.apply, eq)
+ case gt: org.apache.spark.sql.catalyst.expressions.GreaterThan =>
+ applyTypeCoercion(gt.left, gt.right,
org.apache.spark.sql.catalyst.expressions.GreaterThan.apply, gt)
+ case gte: org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual
=>
+ applyTypeCoercion(gte.left, gte.right,
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.apply, gte)
+ case lt: org.apache.spark.sql.catalyst.expressions.LessThan =>
+ applyTypeCoercion(lt.left, lt.right,
org.apache.spark.sql.catalyst.expressions.LessThan.apply, lt)
+ case lte: org.apache.spark.sql.catalyst.expressions.LessThanOrEqual =>
+ applyTypeCoercion(lte.left, lte.right,
org.apache.spark.sql.catalyst.expressions.LessThanOrEqual.apply, lte)
+ }
+ val result = boundExpr.eval(internalRow)
+
+ result match {
+ case null => false
+ case boolean: Boolean => boolean
+ case other =>
+ other.toString.toLowerCase match {
+ case "true" => true
+ case "false" => false
+ case _ => false
+ }
+ }
+ } match {
+ case Success(result) => result
+ case Failure(_) => false
+ }
+ }
+
+ private def convertRowToInternalRow(row: Row, schema: StructType):
GenericInternalRow = {
+ val values = schema.fields.zipWithIndex.map { case (field, index) =>
+ if (row.isNullAt(index)) {
+ null
+ } else {
+ convertValueToInternal(row.get(index), field.dataType)
+ }
+ }
+ new GenericInternalRow(values)
+ }
+
+ private def convertValueToInternal(value: Any, dataType: DataType): Any = {
+ import org.apache.spark.sql.types._
+
+ value match {
+ case null => null
+ case s: String => UTF8String.fromString(s)
+ case ts: java.sql.Timestamp => DateTimeUtils.fromJavaTimestamp(ts)
+ case date: java.sql.Date => DateTimeUtils.fromJavaDate(date)
+ case instant: java.time.Instant => DateTimeUtils.instantToMicros(instant)
+ case localDate: java.time.LocalDate =>
DateTimeUtils.localDateToDays(localDate)
+ case localDateTime: java.time.LocalDateTime =>
DateTimeUtils.localDateTimeToMicros(localDateTime)
+ case byte: Byte => byte
+ case short: Short => short
+ case int: Int => int
+ case long: Long => long
+ case float: Float => float
+ case double: Double => double
+ case decimal: java.math.BigDecimal =>
+ org.apache.spark.sql.types.Decimal(decimal,
dataType.asInstanceOf[DecimalType].precision,
dataType.asInstanceOf[DecimalType].scale)
+ case decimal: scala.math.BigDecimal =>
+ org.apache.spark.sql.types.Decimal(decimal,
dataType.asInstanceOf[DecimalType].precision,
dataType.asInstanceOf[DecimalType].scale)
+ case bool: Boolean => bool
+ case bytes: Array[Byte] => bytes
+ case array: Array[_] =>
+ val arrayType = dataType.asInstanceOf[ArrayType]
+ array.map(convertValueToInternal(_, arrayType.elementType))
+ case list: java.util.List[_] =>
+ val arrayType = dataType.asInstanceOf[ArrayType]
+ list.asScala.map(convertValueToInternal(_,
arrayType.elementType)).toArray
+ case seq: Seq[_] =>
+ val arrayType = dataType.asInstanceOf[ArrayType]
+ seq.map(convertValueToInternal(_, arrayType.elementType)).toArray
+ case map: java.util.Map[_, _] =>
+ val mapType = dataType.asInstanceOf[MapType]
+ val convertedKeys = map.asScala.keys.map(convertValueToInternal(_,
mapType.keyType)).toArray
+ val convertedValues = map.asScala.values.map(convertValueToInternal(_,
mapType.valueType)).toArray
+ org.apache.spark.sql.catalyst.util.ArrayBasedMapData(convertedKeys,
convertedValues)
+ case map: scala.collection.Map[_, _] =>
+ val mapType = dataType.asInstanceOf[MapType]
+ val convertedKeys = map.keys.map(convertValueToInternal(_,
mapType.keyType)).toArray
+ val convertedValues = map.values.map(convertValueToInternal(_,
mapType.valueType)).toArray
+ org.apache.spark.sql.catalyst.util.ArrayBasedMapData(convertedKeys,
convertedValues)
+ case row: org.apache.spark.sql.Row =>
+ val structType = dataType.asInstanceOf[StructType]
+ val values = structType.fields.zipWithIndex.map { case (field, index)
=>
+ if (row.isNullAt(index)) {
+ null
+ } else {
+ convertValueToInternal(row.get(index), field.dataType)
+ }
+ }
+ new GenericInternalRow(values)
+ case utf8: UTF8String => utf8
+ case internalRow: org.apache.spark.sql.catalyst.InternalRow =>
internalRow
+ case mapData: org.apache.spark.sql.catalyst.util.MapData => mapData
+ case arrayData: org.apache.spark.sql.catalyst.util.ArrayData => arrayData
+ case decimal: org.apache.spark.sql.types.Decimal => decimal
+ case uuid: java.util.UUID => UTF8String.fromString(uuid.toString)
+ case other => other
+ }
+ }
+
+ def validateFilterExpression(filterExpression: String, schema: StructType,
sparkSession: SparkSession): Either[String, Unit] = {
+
+ if (filterExpression == null || filterExpression.trim.isEmpty) {
+ Right(())
+ } else {
+ Try {
+ val parsedExpr =
sparkSession.sessionState.sqlParser.parseExpression(filterExpression)
+ val columnNames = schema.fieldNames.toSet
+ val referencedColumns = extractColumnReferences(parsedExpr)
+ val invalidColumns = referencedColumns -- columnNames
+
+ if (invalidColumns.nonEmpty) {
+ Left(s"Invalid column references: ${invalidColumns.mkString(", ")}.
Available columns: ${columnNames.mkString(", ")}")
+ } else {
+ Right(())
+ }
+ } match {
+ case Success(result) => result
+ case Failure(exception) => Left(s"Invalid filter expression:
${exception.getMessage}")
+ }
+ }
+ }
+
+ private def extractColumnReferences(expression: Expression): Set[String] = {
+ import org.apache.spark.sql.catalyst.expressions._
+
+ expression match {
+ case attr: AttributeReference => Set(attr.name)
+ case unresolved: UnresolvedAttribute => Set(unresolved.name)
+ case _ => expression.children.flatMap(extractColumnReferences).toSet
+ }
+ }
+
+ private def applyTypeCoercion[T <:
org.apache.spark.sql.catalyst.expressions.Expression](
+
left: org.apache.spark.sql.catalyst.expressions.Expression,
+
right: org.apache.spark.sql.catalyst.expressions.Expression,
+
constructor:
(org.apache.spark.sql.catalyst.expressions.Expression,
org.apache.spark.sql.catalyst.expressions.Expression) => T,
+
original: T): T = {
+ (left, right) match {
+ case (boundRef:
org.apache.spark.sql.catalyst.expressions.BoundReference, literal:
org.apache.spark.sql.catalyst.expressions.Literal)
+ if boundRef.dataType == org.apache.spark.sql.types.LongType &&
literal.dataType == org.apache.spark.sql.types.IntegerType =>
+ val castExpr =
org.apache.spark.sql.catalyst.expressions.Cast(boundRef,
org.apache.spark.sql.types.IntegerType)
+ constructor(castExpr, literal)
+ case _ => original
+ }
+ }
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
index fc0ef22b1f57..9b599a2bc5cc 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansPlanProcedure.scala
@@ -31,6 +31,106 @@ import java.util.function.Supplier
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
+/**
+ * Spark SQL procedure to show clean plans for a Hudi table.
+ *
+ * This procedure displays information about clean operations that have been
planned but not yet executed.
+ * Clean plans contain metadata about which files are scheduled for deletion
during the next clean operation.
+ *
+ * == Parameters ==
+ * - `table`: Required. The name of the Hudi table to query
+ * - `limit`: Optional. Maximum number of clean plans to return (default: 10)
+ * - `showArchived`: Optional. Whether to include archived clean plans
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty
string)
+ *
+ * == Output Schema ==
+ * - `plan_time`: Timestamp when the clean plan was created
+ * - `state`: Current state of the clean plan (REQUESTED, INFLIGHT, COMPLETED)
+ * - `action`: The action type (always 'clean')
+ * - `earliest_instant_to_retain`: The earliest commit that will be retained
after cleaning
+ * - `last_completed_commit_timestamp`: The last completed commit at the time
of planning
+ * - `policy`: The clean policy used (e.g., KEEP_LATEST_COMMITS,
KEEP_LATEST_FILE_VERSIONS)
+ * - `version`: Version of the clean plan metadata format
+ * - `total_partitions_to_clean`: Number of partitions that have files to be
cleaned
+ * - `total_partitions_to_delete`: Number of partitions that will have files
deleted
+ * - `extra_metadata`: Additional metadata associated with the clean plan
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no clean plans match the criteria
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions that can be applied to any
output column.
+ * The filter uses Spark SQL syntax and supports various data types and
operations.
+ *
+ * === Filter Examples ===
+ * {{{
+ * -- Show clean plans created after a specific timestamp
+ * CALL show_clean_plans(
+ * table => 'my_table',
+ * filter => "plan_time > '20241201000000'"
+ * )
+ *
+ * -- Show clean plans that will clean many partitions
+ * CALL show_clean_plans(
+ * table => 'my_table',
+ * filter => "total_partitions_to_clean > 10"
+ * )
+ *
+ * -- Show recent clean plans with complex conditions
+ * CALL show_clean_plans(
+ * table => 'my_table',
+ * filter => "plan_time > '20241201000000' AND total_partitions_to_delete
BETWEEN 1 AND 100"
+ * )
+ *
+ * -- Show clean plans using string functions and policy filters
+ * CALL show_clean_plans(
+ * table => 'my_table',
+ * filter => "LENGTH(earliest_instant_to_retain) > 10 AND policy =
'KEEP_LATEST_COMMITS'"
+ * )
+ *
+ * -- Show clean plans with null checks and state filtering
+ * CALL show_clean_plans(
+ * table => 'my_table',
+ * filter => "last_completed_commit_timestamp IS NOT NULL AND state =
'COMPLETED'"
+ * )
+ *
+ * -- Show clean plans using IN operator for states
+ * CALL show_clean_plans(
+ * table => 'my_table',
+ * filter => "state IN ('REQUESTED', 'INFLIGHT') AND
total_partitions_to_clean > 0"
+ * )
+ * }}}
+ *
+ * == Usage Examples ==
+ * {{{
+ * -- Basic usage: Show last 10 clean plans
+ * CALL show_clean_plans(table => 'hudi_table_2')
+ *
+ * -- Show more results with custom limit
+ * CALL show_clean_plans(table => 'hudi_table_2', limit => 50)
+ *
+ * -- Include archived clean plans
+ * CALL show_clean_plans(table => 'hudi_table_2', showArchived => true)
+ *
+ * -- Filter for recent clean plans
+ * CALL show_clean_plans(
+ * table => 'hudi_table_2',
+ * filter => "plan_time > '20241201000000'"
+ * )
+ *
+ * -- Show clean plans that will clean many partitions
+ * CALL show_clean_plans(
+ * table => 'hudi_table_2',
+ * filter => "total_partitions_to_clean > 5",
+ * limit => 20
+ * )
+ * }}}
+ *
+ * @see [[ShowCleansProcedure]] for information about completed clean
operations
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
class ShowCleansPlanProcedure extends BaseProcedure with ProcedureBuilder with
SparkAdapterSupport with Logging {
import ShowCleansPlanProcedure._
@@ -45,10 +145,19 @@ class ShowCleansPlanProcedure extends BaseProcedure with
ProcedureBuilder with S
val tableName = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
val showArchived = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[String]
validateInputs(tableName, limit)
- Try {
+ if (filter != null && filter.trim.nonEmpty) {
+ HoodieProcedureFilterUtils.validateFilterExpression(filter, outputType,
sparkSession) match {
+ case Left(errorMessage) =>
+ throw new IllegalArgumentException(s"Invalid filter expression:
$errorMessage")
+ case Right(_) => // Validation passed, continue
+ }
+ }
+
+ val rows = Try {
val hoodieCatalogTable =
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, tableName)
val metaClient = createMetaClient(jsc, hoodieCatalogTable.tableLocation)
getCleanerPlans(metaClient, limit, showArchived)
@@ -59,6 +168,12 @@ class ShowCleansPlanProcedure extends BaseProcedure with
ProcedureBuilder with S
logError(errorMsg, exception)
throw new HoodieException(s"$errorMsg: ${exception.getMessage}",
exception)
}
+
+ if (filter != null && filter.trim.nonEmpty) {
+ HoodieProcedureFilterUtils.evaluateFilter(rows, filter, outputType,
sparkSession)
+ } else {
+ rows
+ }
}
override def build: Procedure = new ShowCleansPlanProcedure()
@@ -184,7 +299,8 @@ object ShowCleansPlanProcedure {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType,
false)
+ ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(3, "filter", DataTypes.StringType, "")
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
index cd918c832e3e..1e560f7ecd5d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala
@@ -30,12 +30,88 @@ import java.util.function.Supplier
import scala.collection.JavaConverters._
+/**
+ * Spark SQL procedure to show completed clean operations for a Hudi table.
+ *
+ * This procedure displays information about clean operations that have been
executed.
+ * Clean operations remove old file versions to reclaim storage space and
maintain table performance.
+ *
+ * == Parameters ==
+ * - `table`: Required. The name of the Hudi table to query
+ * - `limit`: Optional. Maximum number of clean operations to return (default:
10)
+ * - `showArchived`: Optional. Whether to include archived clean operations
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty
string)
+ *
+ * == Output Schema ==
+ * - `clean_time`: Timestamp when the clean operation was performed
+ * - `state_transition_time`: Time when the clean transitioned to completed
state
+ * - `action`: The action type (always 'clean')
+ * - `start_clean_time`: When the clean operation started
+ * - `time_taken_in_millis`: Duration of the clean operation in milliseconds
+ * - `total_files_deleted`: Total number of files deleted during the clean
+ * - `earliest_commit_to_retain`: The earliest commit that was retained
+ * - `last_completed_commit_timestamp`: The last completed commit at clean time
+ * - `version`: Version of the clean operation metadata
+ * - Additional partition-level metadata columns when using
`show_cleans_metadata`
+ *
+ * == Error Handling ==
+ * - Throws `IllegalArgumentException` for invalid filter expressions
+ * - Throws `HoodieException` for table access issues
+ * - Returns empty result set if no clean plans match the criteria
+ *
+ * == Filter Support ==
+ * The `filter` parameter supports SQL expressions for filtering results.
+ *
+ * === Common Filter Examples ===
+ * {{{
+ * -- Show cleans that deleted many files
+ * CALL show_cleans(
+ * table => 'my_table',
+ * filter => "total_files_deleted > 100"
+ * )
+ *
+ * -- Show recent clean operations
+ * CALL show_cleans(
+ * table => 'my_table',
+ * filter => "clean_time > '20231201000000'"
+ * )
+ *
+ * -- Show slow clean operations
+ * CALL show_cleans(
+ * table => 'my_table',
+ * filter => "time_taken_in_millis > 60000"
+ * )
+ * }}}
+ *
+ * == Some Usage Examples ==
+ * {{{
+ * -- Basic usage: Show last 10 completed cleans
+ * CALL show_cleans(table => 'hudi_table_1')
+ *
+ * -- Show clean operations with partition metadata
+ * CALL show_cleans_metadata(table => 'hudi_table_1')
+ *
+ * -- Include archived clean operations
+ * CALL show_cleans(table => 'hudi_table_1', showArchived => true)
+ *
+ * -- Filter for recent efficient cleans
+ * CALL show_cleans(
+ * table => 'hudi_table_1',
+ * filter => "clean_time > '20231201000000' AND total_files_deleted > 0"
+ * )
+ * }}}
+ *
+ * @param includePartitionMetadata Whether to include partition-level metadata
in output
+ * @see [[ShowCleansPlanProcedure]] for information about planned clean
operations
+ * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax
+ */
class ShowCleansProcedure(includePartitionMetadata: Boolean) extends
BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType,
false)
+ ProcedureParameter.optional(2, "showArchived", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(3, "filter", DataTypes.StringType, "")
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -75,6 +151,15 @@ class ShowCleansProcedure(includePartitionMetadata:
Boolean) extends BaseProcedu
val table = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
val showArchived = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[String]
+
+ if (filter != null && filter.trim.nonEmpty) {
+ HoodieProcedureFilterUtils.validateFilterExpression(filter, outputType,
sparkSession) match {
+ case Left(errorMessage) =>
+ throw new IllegalArgumentException(s"Invalid filter expression:
$errorMessage")
+ case Right(_) => // Validation passed, continue
+ }
+ }
val hoodieCatalogTable =
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
val basePath = hoodieCatalogTable.tableLocation
@@ -97,7 +182,11 @@ class ShowCleansProcedure(includePartitionMetadata:
Boolean) extends BaseProcedu
} else {
activeResults
}
- finalResults
+ if (filter != null && filter.trim.nonEmpty) {
+ HoodieProcedureFilterUtils.evaluateFilter(finalResults, filter,
outputType, sparkSession)
+ } else {
+ finalResults
+ }
}
override def build: Procedure = new
ShowCleansProcedure(includePartitionMetadata)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
index 0b97b41e6dd4..6030b931b0a6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowCleansProcedures.scala
@@ -84,6 +84,13 @@ class TestShowCleansProcedures extends
HoodieSparkProcedureTestBase {
val planTime = plan.getString(0)
assert(planTime.nonEmpty && planTime.toLong > 0, "Plan time should
be a valid timestamp")
}
+ val sortedPlans = secondCleanPlans.sortBy(_.getString(0))
+ val actualFirstCleanTime = sortedPlans(0).getString(0)
+ val startTimeStr = (actualFirstCleanTime.toLong + 1000).toString
+ val afterStartFilter = spark.sql(s"""call show_clean_plans(table =>
'$tableName', filter => "plan_time > '$startTimeStr'")""")
+ afterStartFilter.show(false)
+ val afterStartRows = afterStartFilter.collect()
+ assertResult(afterStartRows.length)(1)
}
}
}
@@ -374,4 +381,134 @@ class TestShowCleansProcedures extends
HoodieSparkProcedureTestBase {
spark.sql(s"call show_cleans_metadata(table =>
'$nonExistentTable')").collect()
}
}
+
+ test("Test cleaning with some complex filters") {
+ withSQLConf("hoodie.clean.automatic" -> "false",
"hoodie.parquet.max.file.size" -> "10000") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ if (HoodieSparkUtils.isSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled = false")
+ }
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ |""".stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
+ spark.sql(s"update $tableName set price = 11 where id = 1")
+
+ spark.sql(s"call run_clean(table => '$tableName', retain_commits =>
1)").collect()
+
+ spark.sql(s"update $tableName set price = 12 where id = 1")
+ spark.sql(s"call run_clean(table => '$tableName', retain_commits =>
1)").collect()
+
+ spark.sql(s"update $tableName set price = 13 where id = 1")
+ spark.sql(s"call run_clean(table => '$tableName', retain_commits =>
1)").collect()
+
+ val allCleans = spark.sql(s"call show_cleans(table => '$tableName')")
+ allCleans.show(false)
+ val allCleansDf = allCleans.collect()
+ val firstCleanTime = if (allCleansDf.nonEmpty)
allCleansDf.last.getAs[String]("clean_time") else "0"
+
+ val firstCleanDF = spark.sql(
+ s"""call show_cleans(table => '$tableName', filter => "clean_time =
'$firstCleanTime' AND action = 'clean'")"""
+ )
+ firstCleanDF.show(false)
+ val firstClean = firstCleanDF.collect()
+
+ val laterCleansDF = spark.sql(
+ s"""call show_cleans(table => '$tableName', filter => "clean_time >
'$firstCleanTime' AND action = 'clean'")"""
+ )
+ laterCleansDF.show(false)
+ val laterCleans = laterCleansDF.collect()
+
+ val numericFilterDF = spark.sql(
+ s"""call show_cleans(table => '$tableName', filter =>
"total_files_deleted > 0 AND LENGTH(action) > 3")"""
+ )
+ numericFilterDF.show(false)
+ val numericFilter = numericFilterDF.collect()
+
+ assert(firstClean.length == 1, "First clean filter should execute
successfully")
+ assert(laterCleans.length == allCleansDf.length - 1, "Later cleans
filter should execute successfully")
+ assert(numericFilter.length == allCleansDf.length, "Numeric filter
should execute successfully")
+ }
+ }
+ }
+
+ test("Test filter expressions with various data types") {
+ withSQLConf("hoodie.clean.automatic" -> "false",
"hoodie.parquet.max.file.size" -> "10000") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ if (HoodieSparkUtils.isSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled = false")
+ }
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | active boolean,
+ | ts long
+ | ) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ |""".stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'product1', 99.99, true,
1000)")
+ spark.sql(s"insert into $tableName values(2, 'product2', 149.99,
false, 2000)")
+
+ spark.sql(s"update $tableName set price = 109.99 where id = 1")
+ spark.sql(s"update $tableName set price = 119.99 where id = 1")
+ spark.sql(s"update $tableName set price = 129.99 where id = 2")
+ spark.sql(s"update $tableName set price = 139.99 where id = 2")
+
+ spark.sql(s"insert into $tableName values(3, 'product3', 199.99, true,
3000)")
+ spark.sql(s"update $tableName set price = 149.99 where id = 1")
+
+ spark.sql(s"call run_clean(table => '$tableName', retain_commits =>
2)").collect()
+
+ val allCleansDF = spark.sql(s"call show_cleans(table => '$tableName',
showArchived => true)")
+ allCleansDF.show(false)
+
+ val filterTests = Seq(
+ ("action = 'clean'", "String equality"),
+ ("action LIKE 'clean%'", "String LIKE pattern"),
+ ("UPPER(action) = 'CLEAN'", "String function with equality"),
+ ("LENGTH(clean_time) > 5", "String length function"),
+ ("total_files_deleted >= 0", "Numeric comparison"),
+ ("time_taken_in_millis BETWEEN 0 AND 999999", "Numeric BETWEEN"),
+ ("clean_time IS NOT NULL", "NULL check"),
+ ("action = 'clean' AND total_files_deleted >= 0", "AND logic"),
+ ("total_files_deleted >= 0 OR time_taken_in_millis >= 0", "OR
logic"),
+ ("NOT (total_files_deleted < 0)", "NOT logic"),
+ ("action IN ('clean', 'commit', 'rollback')", "IN operator")
+ )
+
+ filterTests.foreach { case (filterExpr, description) =>
+ val filteredResult = spark.sql(
+ s"""call show_cleans(table => '$tableName',
+ |filter => "$filterExpr")""".stripMargin
+ ).collect()
+ assert(filteredResult.length > 0, s"Filter '$description' should
execute successfully")
+ }
+ }
+ }
+ }
}