This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7615188e04 [spark] Allow automatic casting in the where condiction of
compact procedure (#6056)
7615188e04 is described below
commit 7615188e04f197266d404f96050b3e5f07473505
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Aug 12 09:47:54 2025 +0800
[spark] Allow automatic casting in the where condiction of compact
procedure (#6056)
---
.../analysis/expressions/ExpressionHelper.scala | 23 ++++++++++++++++
.../paimon/spark/SparkV2FilterConverter.scala | 30 +++++++++++++++++++-
.../analysis/expressions/ExpressionHelper.scala | 32 +++++++++++++++-------
.../spark/procedure/CompactProcedureTestBase.scala | 17 ++++++++++++
4 files changed, 91 insertions(+), 11 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 56223c36cd..6cc1ce4794 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -23,7 +23,11 @@ import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.types.RowType
import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
trait ExpressionHelper extends ExpressionHelperBase {
@@ -52,4 +56,23 @@ trait ExpressionHelper extends ExpressionHelperBase {
Some(PredicateBuilder.and(predicates: _*))
}
}
+
+ def resolveFilter(
+ spark: SparkSession,
+ relation: DataSourceV2Relation,
+ conditionSql: String): Expression = {
+ val unResolvedExpression =
spark.sessionState.sqlParser.parseExpression(conditionSql)
+ val filter = Filter(unResolvedExpression, relation)
+ spark.sessionState.analyzer.execute(filter) match {
+ case filter: Filter =>
+ try {
+ ConstantFolding.apply(filter).asInstanceOf[Filter].condition
+ } catch {
+ case _: Throwable => filter.condition
+ }
+ case _ =>
+ throw new RuntimeException(
+ s"Could not resolve expression $conditionSql in relation: $relation")
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
index a3615e51d3..caf6986f1e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala
@@ -24,6 +24,7 @@ import
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSpa
import org.apache.paimon.types.{DataTypeRoot, DecimalType, RowType}
import org.apache.paimon.types.DataTypeRoot._
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.expressions.{Literal, NamedReference}
import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or,
Predicate => SparkPredicate}
@@ -54,6 +55,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
// TODO deal with isNaN
val index = fieldIndex(fieldName)
builder.equal(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case EQUAL_NULL_SAFE =>
@@ -65,6 +68,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
} else {
builder.equal(index, convertLiteral(index, literal))
}
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case GREATER_THAN =>
@@ -72,6 +77,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.greaterThan(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case GREATER_THAN_OR_EQUAL =>
@@ -79,6 +86,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.greaterOrEqual(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case LESS_THAN =>
@@ -86,6 +95,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.lessThan(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case LESS_THAN_OR_EQUAL =>
@@ -93,6 +104,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.lessOrEqual(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case IN =>
@@ -101,18 +114,24 @@ case class SparkV2FilterConverter(rowType: RowType) {
val index = fieldIndex(fieldName)
literals.map(convertLiteral(index, _)).toList.asJava
builder.in(index, literals.map(convertLiteral(index,
_)).toList.asJava)
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case IS_NULL =>
UnaryPredicate.unapply(sparkPredicate) match {
case Some(fieldName) =>
builder.isNull(fieldIndex(fieldName))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case IS_NOT_NULL =>
UnaryPredicate.unapply(sparkPredicate) match {
case Some(fieldName) =>
builder.isNotNull(fieldIndex(fieldName))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case AND =>
@@ -137,6 +156,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.startsWith(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case STRING_END_WITH =>
@@ -144,6 +165,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.endsWith(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
case STRING_CONTAINS =>
@@ -151,6 +174,8 @@ case class SparkV2FilterConverter(rowType: RowType) {
case Some((fieldName, literal)) =>
val index = fieldIndex(fieldName)
builder.contains(index, convertLiteral(index, literal))
+ case _ =>
+ throw new UnsupportedOperationException(s"Convert $sparkPredicate
is unsupported.")
}
// TODO: AlwaysTrue, AlwaysFalse
@@ -201,7 +226,7 @@ case class SparkV2FilterConverter(rowType: RowType) {
}
}
-object SparkV2FilterConverter {
+object SparkV2FilterConverter extends Logging {
private val EQUAL_TO = "="
private val EQUAL_NULL_SAFE = "<=>"
@@ -258,6 +283,9 @@ object SparkV2FilterConverter {
case IN =>
MultiPredicate.unapply(sparkPredicate) match {
case Some((fieldName, _)) => partitionKeys.contains(fieldName)
+ case _ =>
+ logWarning(s"Convert $sparkPredicate is unsupported.")
+ false
}
case _ => false
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 5540f58e0e..fbb1aa7d8f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -26,9 +26,12 @@ import org.apache.paimon.types.RowType
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilterV2}
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast,
Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd,
EqualNullSafe, EqualTo, Filter => SourceFilter}
@@ -66,6 +69,25 @@ trait ExpressionHelper extends ExpressionHelperBase {
Some(PredicateBuilder.and(predicates: _*))
}
}
+
+ def resolveFilter(
+ spark: SparkSession,
+ relation: DataSourceV2Relation,
+ conditionSql: String): Expression = {
+ val unResolvedExpression =
spark.sessionState.sqlParser.parseExpression(conditionSql)
+ val filter = Filter(unResolvedExpression, relation)
+ spark.sessionState.analyzer.executeAndCheck(filter, new
QueryPlanningTracker) match {
+ case filter: Filter =>
+ try {
+ ConstantFolding.apply(filter).asInstanceOf[Filter].condition
+ } catch {
+ case _: Throwable => filter.condition
+ }
+ case _ =>
+ throw new RuntimeException(
+ s"Could not resolve expression $conditionSql in relation: $relation")
+ }
+ }
}
trait ExpressionHelperBase extends PredicateHelper {
@@ -188,16 +210,6 @@ trait ExpressionHelperBase extends PredicateHelper {
s"Unsupported update expression: $other, only support update with
PrimitiveType and StructType.")
}
- def resolveFilter(spark: SparkSession, plan: LogicalPlan, conditionSql:
String): Expression = {
- val unResolvedExpression =
spark.sessionState.sqlParser.parseExpression(conditionSql)
- val filter = Filter(unResolvedExpression, plan)
- spark.sessionState.analyzer.execute(filter) match {
- case filter: Filter => filter.condition
- case _ =>
- throw new RuntimeException(s"Could not resolve expression
$conditionSql in plan: $plan")
- }
- }
-
def splitPruePartitionAndOtherPredicates(
condition: Expression,
partitionColumns: Seq[String],
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 4da80dc2da..8d1b35cc12 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -778,6 +778,23 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
}
}
+ test("Paimon Procedure: type cast in where") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id INT, value STRING, day_part LONG)
+ |TBLPROPERTIES ('compaction.min.file-num'='2')
+ |PARTITIONED BY (day_part)
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a', 20250810)")
+ sql("INSERT INTO t VALUES (2, 'b', 20250810)")
+ sql("INSERT INTO t VALUES (3, 'c', 20250811)")
+
+ sql("CALL sys.compact(table => 't', where => 'day_part < 20250811 and
day_part > 20250809')")
+ val table = loadTable("t")
+
assert(table.snapshotManager().latestSnapshot().commitKind().equals(CommitKind.COMPACT))
+ }
+ }
+
def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
table.snapshotManager().latestSnapshot().commitKind()
}