This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7615188e04 [spark] Allow automatic casting in the where condiction of 
compact procedure (#6056)
7615188e04 is described below

commit 7615188e04f197266d404f96050b3e5f07473505
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Aug 12 09:47:54 2025 +0800

    [spark] Allow automatic casting in the where condiction of compact 
procedure (#6056)
---
 .../analysis/expressions/ExpressionHelper.scala    | 23 ++++++++++++++++
 .../paimon/spark/SparkV2FilterConverter.scala      | 30 +++++++++++++++++++-
 .../analysis/expressions/ExpressionHelper.scala    | 32 +++++++++++++++-------
 .../spark/procedure/CompactProcedureTestBase.scala | 17 ++++++++++++
 4 files changed, 91 insertions(+), 11 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 56223c36cd..6cc1ce4794 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -23,7 +23,11 @@ import org.apache.paimon.spark.SparkFilterConverter
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 
 trait ExpressionHelper extends ExpressionHelperBase {
 
@@ -52,4 +56,23 @@ trait ExpressionHelper extends ExpressionHelperBase {
       Some(PredicateBuilder.and(predicates: _*))
     }
   }
+
+  def resolveFilter(
+      spark: SparkSession,
+      relation: DataSourceV2Relation,
+      conditionSql: String): Expression = {
+    val unResolvedExpression = 
spark.sessionState.sqlParser.parseExpression(conditionSql)
+    val filter = Filter(unResolvedExpression, relation)
+    spark.sessionState.analyzer.execute(filter) match {
+      case filter: Filter =>
+        try {
+          ConstantFolding.apply(filter).asInstanceOf[Filter].condition
+        } catch {
+          case _: Throwable => filter.condition
+        }
+      case _ =>
+        throw new RuntimeException(
+          s"Could not resolve expression $conditionSql in relation: $relation")
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
index a3615e51d3..caf6986f1e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -24,6 +24,7 @@ import 
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSpa
 import org.apache.paimon.types.{DataTypeRoot, DecimalType, RowType}
 import org.apache.paimon.types.DataTypeRoot._
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.{Literal, NamedReference}
 import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, 
Predicate => SparkPredicate}
@@ -54,6 +55,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
             // TODO deal with isNaN
             val index = fieldIndex(fieldName)
             builder.equal(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case EQUAL_NULL_SAFE =>
@@ -65,6 +68,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
             } else {
               builder.equal(index, convertLiteral(index, literal))
             }
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case GREATER_THAN =>
@@ -72,6 +77,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
           case Some((fieldName, literal)) =>
             val index = fieldIndex(fieldName)
             builder.greaterThan(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case GREATER_THAN_OR_EQUAL =>
@@ -79,6 +86,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
           case Some((fieldName, literal)) =>
             val index = fieldIndex(fieldName)
             builder.greaterOrEqual(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case LESS_THAN =>
@@ -86,6 +95,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
           case Some((fieldName, literal)) =>
             val index = fieldIndex(fieldName)
             builder.lessThan(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case LESS_THAN_OR_EQUAL =>
@@ -93,6 +104,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
           case Some((fieldName, literal)) =>
             val index = fieldIndex(fieldName)
             builder.lessOrEqual(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case IN =>
@@ -101,18 +114,24 @@ case class SparkV2FilterConverter(rowType: RowType) {
             val index = fieldIndex(fieldName)
             literals.map(convertLiteral(index, _)).toList.asJava
             builder.in(index, literals.map(convertLiteral(index, 
_)).toList.asJava)
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case IS_NULL =>
         UnaryPredicate.unapply(sparkPredicate) match {
           case Some(fieldName) =>
             builder.isNull(fieldIndex(fieldName))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case IS_NOT_NULL =>
         UnaryPredicate.unapply(sparkPredicate) match {
           case Some(fieldName) =>
             builder.isNotNull(fieldIndex(fieldName))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case AND =>
@@ -137,6 +156,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
           case Some((fieldName, literal)) =>
             val index = fieldIndex(fieldName)
             builder.startsWith(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case STRING_END_WITH =>
@@ -144,6 +165,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
           case Some((fieldName, literal)) =>
             val index = fieldIndex(fieldName)
             builder.endsWith(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       case STRING_CONTAINS =>
@@ -151,6 +174,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
           case Some((fieldName, literal)) =>
             val index = fieldIndex(fieldName)
             builder.contains(index, convertLiteral(index, literal))
+          case _ =>
+            throw new UnsupportedOperationException(s"Convert $sparkPredicate 
is unsupported.")
         }
 
       // TODO: AlwaysTrue, AlwaysFalse
@@ -201,7 +226,7 @@ case class SparkV2FilterConverter(rowType: RowType) {
   }
 }
 
-object SparkV2FilterConverter {
+object SparkV2FilterConverter extends Logging {
 
   private val EQUAL_TO = "="
   private val EQUAL_NULL_SAFE = "<=>"
@@ -258,6 +283,9 @@ object SparkV2FilterConverter {
       case IN =>
         MultiPredicate.unapply(sparkPredicate) match {
           case Some((fieldName, _)) => partitionKeys.contains(fieldName)
+          case _ =>
+            logWarning(s"Convert $sparkPredicate is unsupported.")
+            false
         }
       case _ => false
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 5540f58e0e..fbb1aa7d8f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -26,9 +26,12 @@ import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilterV2}
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, 
Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd, 
EqualNullSafe, EqualTo, Filter => SourceFilter}
@@ -66,6 +69,25 @@ trait ExpressionHelper extends ExpressionHelperBase {
       Some(PredicateBuilder.and(predicates: _*))
     }
   }
+
+  def resolveFilter(
+      spark: SparkSession,
+      relation: DataSourceV2Relation,
+      conditionSql: String): Expression = {
+    val unResolvedExpression = 
spark.sessionState.sqlParser.parseExpression(conditionSql)
+    val filter = Filter(unResolvedExpression, relation)
+    spark.sessionState.analyzer.executeAndCheck(filter, new 
QueryPlanningTracker) match {
+      case filter: Filter =>
+        try {
+          ConstantFolding.apply(filter).asInstanceOf[Filter].condition
+        } catch {
+          case _: Throwable => filter.condition
+        }
+      case _ =>
+        throw new RuntimeException(
+          s"Could not resolve expression $conditionSql in relation: $relation")
+    }
+  }
 }
 
 trait ExpressionHelperBase extends PredicateHelper {
@@ -188,16 +210,6 @@ trait ExpressionHelperBase extends PredicateHelper {
         s"Unsupported update expression: $other, only support update with 
PrimitiveType and StructType.")
   }
 
-  def resolveFilter(spark: SparkSession, plan: LogicalPlan, conditionSql: 
String): Expression = {
-    val unResolvedExpression = 
spark.sessionState.sqlParser.parseExpression(conditionSql)
-    val filter = Filter(unResolvedExpression, plan)
-    spark.sessionState.analyzer.execute(filter) match {
-      case filter: Filter => filter.condition
-      case _ =>
-        throw new RuntimeException(s"Could not resolve expression 
$conditionSql in plan: $plan")
-    }
-  }
-
   def splitPruePartitionAndOtherPredicates(
       condition: Expression,
       partitionColumns: Seq[String],
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 4da80dc2da..8d1b35cc12 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -778,6 +778,23 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     }
   }
 
+  test("Paimon Procedure: type cast in where") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id INT, value STRING, day_part LONG)
+            |TBLPROPERTIES ('compaction.min.file-num'='2')
+            |PARTITIONED BY (day_part)
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'a', 20250810)")
+      sql("INSERT INTO t VALUES (2, 'b', 20250810)")
+      sql("INSERT INTO t VALUES (3, 'c', 20250811)")
+
+      sql("CALL sys.compact(table => 't', where => 'day_part < 20250811 and 
day_part > 20250809')")
+      val table = loadTable("t")
+      
assert(table.snapshotManager().latestSnapshot().commitKind().equals(CommitKind.COMPACT))
+    }
+  }
+
   def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
     table.snapshotManager().latestSnapshot().commitKind()
   }

Reply via email to