This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 69ae696159 [core][spark] Introduce cast transform (#6581)
69ae696159 is described below
commit 69ae69615902436d9ad333593f4c488125cb8c12
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Nov 11 10:51:03 2025 +0800
[core][spark] Introduce cast transform (#6581)
---
.../org/apache/paimon/predicate/CastTransform.java | 112 +++++++++++++++++++++
.../org/apache/paimon/spark/PaimonBaseScan.scala | 10 +-
.../spark/util/SparkExpressionConverter.scala | 48 +++++----
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 23 +++++
4 files changed, 174 insertions(+), 19 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/CastTransform.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/CastTransform.java
new file mode 100644
index 0000000000..37141011f3
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/CastTransform.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.InternalRowUtils.get;
+
+/** Transform that casts a field to a new type. */
+public class CastTransform implements Transform {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FieldRef fieldRef;
+ private final DataType type;
+ private transient CastExecutor<Object, Object> cast;
+
+ private CastTransform(FieldRef fieldRef, DataType type,
CastExecutor<Object, Object> cast) {
+ this.fieldRef = fieldRef;
+ this.type = type;
+ this.cast = cast;
+ }
+
+ public static Optional<Transform> tryCreate(FieldRef fieldRef, DataType
type) {
+ if (fieldRef.type().equals(type)) {
+ return Optional.of(new FieldTransform(fieldRef));
+ }
+
+ @SuppressWarnings("unchecked")
+ CastExecutor<Object, Object> cast =
+ (CastExecutor<Object, Object>)
CastExecutors.resolve(fieldRef.type(), type);
+ if (cast == null) {
+ return Optional.empty();
+ } else {
+ return Optional.of(new CastTransform(fieldRef, type, cast));
+ }
+ }
+
+ @Override
+ public List<Object> inputs() {
+ return Collections.singletonList(fieldRef);
+ }
+
+ @Override
+ public DataType outputType() {
+ return type;
+ }
+
+ @Override
+ public Object transform(InternalRow row) {
+ return cast.cast(get(row, fieldRef.index(), fieldRef.type()));
+ }
+
+ @Override
+ public Transform copyWithNewInputs(List<Object> inputs) {
+ assert inputs.size() == 1;
+ return new CastTransform((FieldRef) inputs.get(0), type, cast);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CastTransform that = (CastTransform) o;
+ return Objects.equals(fieldRef, that.fieldRef) && Objects.equals(type,
that.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fieldRef, type);
+ }
+
+ @Override
+ public String toString() {
+ return "CAST( " + fieldRef + " AS " + type + ")";
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ @SuppressWarnings("unchecked")
+ CastExecutor<Object, Object> resolved =
+ (CastExecutor<Object, Object>)
CastExecutors.resolve(fieldRef.type(), type);
+ this.cast = resolved;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index d4f0b0cfe0..f2a12f1b41 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -128,6 +128,13 @@ abstract class PaimonBaseScan(
} else {
""
}
+
+ val reservedFiltersStr = if (reservedFilters.nonEmpty) {
+ ", ReservedFilters: [" + reservedFilters.mkString(",") + "]"
+ } else {
+ ""
+ }
+
val pushedTopNFilterStr = if (pushDownTopN.nonEmpty) {
s", PushedTopNFilter: [${pushDownTopN.get.toString}]"
} else {
@@ -169,7 +176,8 @@ abstract class PaimonBaseScan(
""
}
- s"PaimonScan: [${table.name}]" + latestSnapshotIdStr +
currentSnapshotIdStr + pushedFiltersStr + pushedTopNFilterStr +
+ s"PaimonScan: [${table.name}]" + latestSnapshotIdStr +
currentSnapshotIdStr +
+ pushedFiltersStr + reservedFiltersStr + pushedTopNFilterStr +
pushDownLimit.map(limit => s", Limit: [$limit]").getOrElse("")
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
index 71bbea254c..32409544b6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
@@ -19,43 +19,55 @@
package org.apache.paimon.spark.util
import org.apache.paimon.data.{BinaryString, Decimal, Timestamp}
-import org.apache.paimon.predicate.{ConcatTransform, FieldRef, FieldTransform,
Transform, UpperTransform}
-import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.predicate._
+import org.apache.paimon.spark.{PaimonImplicits, SparkTypeUtils}
import
org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType
import org.apache.paimon.types.{DecimalType, RowType}
import org.apache.paimon.types.DataTypeRoot._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.connector.expressions.{Expression,
GeneralScalarExpression, Literal, NamedReference}
+import org.apache.spark.sql.connector.expressions.{Cast, Expression,
GeneralScalarExpression, Literal, NamedReference}
import scala.collection.JavaConverters._
object SparkExpressionConverter {
- // Supported transform names
+ import PaimonImplicits._
+
+ // Supported general scalar transform names
private val CONCAT = "CONCAT"
private val UPPER = "UPPER"
/** Convert Spark [[Expression]] to Paimon [[Transform]], return None if not
supported. */
def toPaimonTransform(exp: Expression, rowType: RowType): Option[Transform]
= {
+
+ def convertChildren(children: Seq[Expression]) = {
+ val converted = children.map {
+ case n: NamedReference => Some(toPaimonFieldRef(n, rowType))
+ case l: Literal[_] => Some(toPaimonLiteral(l))
+ case _ => None
+ }
+ if (converted.exists(_.isEmpty)) {
+ None
+ } else {
+ Some(converted.map(_.get).asJava)
+ }
+ }
+
exp match {
case n: NamedReference => Some(new FieldTransform(toPaimonFieldRef(n,
rowType)))
case s: GeneralScalarExpression =>
s.name() match {
- case CONCAT =>
- val inputs = exp.children().map {
- case n: NamedReference => toPaimonFieldRef(n, rowType)
- case l: Literal[_] => toPaimonLiteral(l)
- case _ => return None
- }
- Some(new ConcatTransform(inputs.toList.asJava))
- case UPPER =>
- val inputs = exp.children().map {
- case n: NamedReference => toPaimonFieldRef(n, rowType)
- case l: Literal[_] => toPaimonLiteral(l)
- case _ => return None
- }
- Some(new UpperTransform(inputs.toList.asJava))
+ case CONCAT => convertChildren(s.children()).map(i => new
ConcatTransform(i))
+ case UPPER => convertChildren(s.children()).map(i => new
UpperTransform(i))
+ case _ => None
+ }
+ case c: Cast =>
+ c.expression() match {
+ case n: NamedReference =>
+ CastTransform.tryCreate(
+ toPaimonFieldRef(n, rowType),
+ SparkTypeUtils.toPaimonType(c.dataType()))
case _ => None
}
case _ => None
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index be2bf18bef..c23694e1d1 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -159,6 +159,29 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
}
}
+ test(s"Paimon push down: apply CAST") {
+ if (gteqSpark3_4) {
+ withSparkSQLConf("spark.sql.ansi.enabled" -> "true") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id int, value int, dt STRING)
+ |using paimon
+ |PARTITIONED BY (dt)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t values
+ |(1, 100, '1')
+ |""".stripMargin)
+
+ val q = "SELECT * FROM t WHERE dt = 1"
+ assert(!checkFilterExists(q))
+ checkAnswer(sql(q), Seq(Row(1, 100, "1")))
+ }
+ }
+ }
+ }
+
test("Paimon pushDown: limit for append-only tables with deletion vector") {
withTable("dv_test") {
spark.sql(