This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3c0dd803f5 [spark] Introduce substring transform (#7170)
3c0dd803f5 is described below
commit 3c0dd803f56469e1a790dbfdaee205231bb1a11e
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Feb 2 11:33:51 2026 +0800
[spark] Introduce substring transform (#7170)
---
.../paimon/predicate/SubstringTransform.java | 127 +++++++++++++++++++++
.../paimon/predicate/SubstringTransformTest.java | 102 +++++++++++++++++
.../spark/util/SparkExpressionConverter.scala | 2 +
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 42 +++++++
4 files changed, 273 insertions(+)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/SubstringTransform.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/SubstringTransform.java
new file mode 100644
index 0000000000..00431a61f3
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/SubstringTransform.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING;
+import static org.apache.paimon.types.DataTypeFamily.INTEGER_NUMERIC;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Substring {@link Transform}. */
+public class SubstringTransform implements Transform {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String NAME = "SUBSTRING";
+
+ private final List<Object> inputs;
+
+ public SubstringTransform(List<Object> inputs) {
+ checkArgument(inputs.size() == 2 || inputs.size() == 3);
+ this.inputs = inputs;
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public final Object transform(InternalRow row) {
+ Object source = inputs.get(0);
+ BinaryString sourceString = null;
+ if (source instanceof FieldRef) {
+ FieldRef sourceFieldRef = (FieldRef) source;
+ checkArgument(sourceFieldRef.type().is(CHARACTER_STRING));
+ sourceString = row.isNullAt(0) ? null :
row.getString(sourceFieldRef.index());
+ } else {
+ sourceString = (BinaryString) inputs.get(0);
+ }
+ if (sourceString == null) {
+ return sourceString;
+ }
+
+ String sourceJavaString = sourceString.toString();
+ Object begin = inputs.get(1);
+ int beginIndex;
+ if (begin instanceof FieldRef) {
+ FieldRef beginRef = (FieldRef) begin;
+ checkArgument(beginRef.type().is(INTEGER_NUMERIC));
+ beginIndex = row.getInt(beginRef.index());
+ } else {
+ beginIndex = Integer.parseInt(inputs.get(1).toString());
+ }
+ if (beginIndex > sourceJavaString.length()) {
+ return BinaryString.EMPTY_UTF8;
+ }
+
+ int endIndex = sourceJavaString.length();
+ if (inputs.size() == 3) {
+ Object end = inputs.get(2);
+ if (end instanceof FieldRef) {
+ FieldRef endRef = (FieldRef) inputs.get(2);
+ checkArgument(endRef.type().is(INTEGER_NUMERIC));
+ endIndex = beginIndex + row.getInt(endRef.index()) - 1;
+ } else {
+ endIndex = beginIndex +
Integer.parseInt(inputs.get(2).toString()) - 1;
+ }
+ }
+ endIndex = Math.min(endIndex, sourceJavaString.length());
+ beginIndex--;
+ checkArgument(beginIndex < endIndex);
+
+ return BinaryString.fromString(sourceJavaString.substring(beginIndex,
endIndex));
+ }
+
+ @Override
+ public Transform copyWithNewInputs(List<Object> inputs) {
+ return new SubstringTransform(inputs);
+ }
+
+ @Override
+ public final List<Object> inputs() {
+ return inputs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubstringTransform that = (SubstringTransform) o;
+ return Objects.equals(inputs, that.inputs);
+ }
+
+ @Override
+ public DataType outputType() {
+ return DataTypes.STRING();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(inputs);
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/predicate/SubstringTransformTest.java
b/paimon-common/src/test/java/org/apache/paimon/predicate/SubstringTransformTest.java
new file mode 100644
index 0000000000..8ddb74440b
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/predicate/SubstringTransformTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SubstringTransformTest {
+
+ @Test
+ public void testNullInputs() {
+ List<Object> inputs = new ArrayList<>();
+ inputs.add(BinaryString.fromString(null));
+ inputs.add(1);
+ SubstringTransform transform = new SubstringTransform(inputs);
+ Object result = transform.transform(GenericRow.of());
+ assertThat(result).isEqualTo(null);
+ }
+
+ @Test
+ public void testNormalInputs() {
+ // test substring('hello', 1)
+ List<Object> inputs = new ArrayList<>();
+ inputs.add(BinaryString.fromString("hello"));
+ inputs.add(2);
+ SubstringTransform transform = new SubstringTransform(inputs);
+ Object result = transform.transform(GenericRow.of());
+ assertThat(result).isEqualTo(BinaryString.fromString("ello"));
+
+ // test substring('hello', 1, 3)
+ inputs.add(3);
+ transform = new SubstringTransform(inputs);
+ result = transform.transform(GenericRow.of());
+ assertThat(result).isEqualTo(BinaryString.fromString("ell"));
+
+ // test substring('hello', 1, 100)
+ inputs.remove(2);
+ inputs.add(100);
+ transform = new SubstringTransform(inputs);
+ result = transform.transform(GenericRow.of());
+ assertThat(result).isEqualTo(BinaryString.fromString("ello"));
+
+ // test substring('hello', 5, 1)
+ inputs.clear();
+ inputs.add(BinaryString.fromString("hello"));
+ inputs.add(5);
+ inputs.add(1);
+ transform = new SubstringTransform(inputs);
+ result = transform.transform(GenericRow.of());
+ assertThat(result).isEqualTo(BinaryString.fromString("o"));
+
+ // test substring('hello', 10)
+ inputs.clear();
+ inputs.add(BinaryString.fromString("hello"));
+ inputs.add(10);
+ transform = new SubstringTransform(inputs);
+ result = transform.transform(GenericRow.of());
+ assertThat(result).isEqualTo(BinaryString.fromString(""));
+ }
+
+ @Test
+ public void testSubstringRefInputs() {
+ List<Object> inputs = new ArrayList<>();
+ inputs.add(new FieldRef(1, "f1", DataTypes.STRING()));
+ inputs.add(new FieldRef(3, "f3", DataTypes.INT()));
+ inputs.add(new FieldRef(4, "f4", DataTypes.INT()));
+ SubstringTransform transform = new SubstringTransform(inputs);
+ Object result =
+ transform.transform(
+ GenericRow.of(
+ BinaryString.fromString(""),
+ BinaryString.fromString("hello"),
+ BinaryString.fromString(""),
+ 2,
+ 3));
+ assertThat(result).isEqualTo(BinaryString.fromString("ell"));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
index 83a75c37dd..347bde6513 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala
@@ -38,6 +38,7 @@ object SparkExpressionConverter {
private val CONCAT = "CONCAT"
private val UPPER = "UPPER"
private val LOWER = "LOWER"
+ private val SUBSTRING = "SUBSTRING"
/** Convert Spark [[Expression]] to Paimon [[Transform]], return None if not
supported. */
def toPaimonTransform(exp: Expression, rowType: RowType): Option[Transform]
= {
@@ -62,6 +63,7 @@ object SparkExpressionConverter {
case CONCAT => convertChildren(s.children()).map(i => new
ConcatTransform(i))
case UPPER => convertChildren(s.children()).map(i => new
UpperTransform(i))
case LOWER => convertChildren(s.children()).map(i => new
LowerTransform(i))
+ case SUBSTRING => convertChildren(s.children()).map(i => new
SubstringTransform(i))
case _ => None
}
case c: Cast =>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 3902e8c4ad..1731231a48 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -190,6 +190,48 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase with AdaptiveS
}
}
+ test(s"Paimon push down: apply SUBSTRING") {
+ // Spark support push down LOWER since Spark 3.4.
+ if (gteqSpark3_4) {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id int, value int, dt STRING)
+ |using paimon
+ |PARTITIONED BY (dt)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t values
+ |(1, 100, '_hello')
+ |""".stripMargin)
+
+ val q =
+ """
+ |SELECT * FROM t
+ |WHERE SUBSTRING(dt, 2) = 'hello'
+ |""".stripMargin
+ assert(!checkFilterExists(q))
+
+ checkAnswer(
+ spark.sql(q),
+ Seq(Row(1, 100, "_hello"))
+ )
+
+ val q1 =
+ """
+ |SELECT * FROM t
+ |WHERE SUBSTRING(dt, 2, 2) = 'he'
+ |""".stripMargin
+ assert(!checkFilterExists(q1))
+
+ checkAnswer(
+ spark.sql(q1),
+ Seq(Row(1, 100, "_hello"))
+ )
+ }
+ }
+ }
+
test(s"Paimon push down: apply CAST") {
if (gteqSpark3_4) {
withSparkSQLConf("spark.sql.ansi.enabled" -> "true") {