This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f8277d9d9 [core] Introduce transform predicate for complex predicate 
(#6498)
6f8277d9d9 is described below

commit 6f8277d9d978e796944f12c721be44b55b31ca0e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Oct 30 17:24:14 2025 +0800

    [core] Introduce transform predicate for complex predicate (#6498)
---
 .../java/org/apache/paimon/data/BinaryString.java  | 104 ++++++++++++++++++
 .../paimon/fileindex/FileIndexPredicate.java       |  11 ++
 .../apache/paimon/fileindex/FileIndexReader.java   |   6 ++
 ...{PredicateVisitor.java => ConcatTransform.java} |  19 +++-
 ...redicateVisitor.java => ConcatWsTransform.java} |  23 +++-
 .../paimon/predicate/LeafPredicateExtractor.java   |   5 +
 .../predicate/OnlyPartitionKeyEqualVisitor.java    |   5 +
 .../predicate/PartitionPredicateVisitor.java       |  13 +++
 .../predicate/PredicateProjectionConverter.java    |   5 +
 .../paimon/predicate/PredicateReplaceVisitor.java  |   6 ++
 .../apache/paimon/predicate/PredicateVisitor.java  |   2 +
 .../apache/paimon/predicate/StringTransform.java   | 100 +++++++++++++++++
 .../{PredicateVisitor.java => Transform.java}      |  16 ++-
 .../paimon/predicate/TransformPredicate.java       | 120 +++++++++++++++++++++
 .../org/apache/paimon/data/BinaryStringTest.java   |  38 +++++++
 .../paimon/predicate/ConcatTransformTest.java      |  75 +++++++++++++
 .../paimon/predicate/ConcatWsTransformTest.java    |  78 ++++++++++++++
 .../paimon/predicate/TransformPredicateTest.java   |  81 ++++++++++++++
 .../orc/filter/OrcPredicateFunctionVisitor.java    |   6 ++
 .../parquet/filter2/predicate/ParquetFilters.java  |   6 ++
 20 files changed, 707 insertions(+), 12 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BinaryString.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryString.java
index 6ccecc9256..3756ac9be9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryString.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryString.java
@@ -1012,4 +1012,108 @@ public final class BinaryString extends BinarySection 
implements Comparable<Bina
     public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
         return new String(bytes, offset, len, StandardCharsets.UTF_8);
     }
+
+    /**
+     * Concatenates input strings together into a single string. Returns NULL 
if any argument is
+     * NULL.
+     */
+    public static BinaryString concat(BinaryString... inputs) {
+        return concat(Arrays.asList(inputs));
+    }
+
+    /**
+     * Concatenates input strings together into a single string. Returns NULL 
if any argument is
+     * NULL.
+     */
+    public static BinaryString concat(Iterable<BinaryString> inputs) {
+        // Compute the total length of the result.
+        int totalLength = 0;
+        for (BinaryString input : inputs) {
+            if (input == null) {
+                return null;
+            }
+
+            totalLength += input.getSizeInBytes();
+        }
+
+        // Allocate a new byte array, and copy the inputs one by one into it.
+        final byte[] result = new byte[totalLength];
+        int offset = 0;
+        for (BinaryString input : inputs) {
+            if (input != null) {
+                int len = input.getSizeInBytes();
+                MemorySegmentUtils.copyToBytes(
+                        input.getSegments(), input.getOffset(), result, 
offset, len);
+                offset += len;
+            }
+        }
+        return fromBytes(result);
+    }
+
+    /**
+     * Concatenates input strings together into a single string using the 
separator. Returns NULL If
+     * the separator is NULL.
+     *
+     * <p>Note: CONCAT_WS() does not skip any empty strings, however it does 
skip any NULL values
+     * after the separator. For example, concat_ws(",", "a", null, "c") would 
yield "a,c".
+     */
+    public static BinaryString concatWs(BinaryString separator, 
BinaryString... inputs) {
+        return concatWs(separator, Arrays.asList(inputs));
+    }
+
+    /**
+     * Concatenates input strings together into a single string using the 
separator. Returns NULL If
+     * the separator is NULL.
+     *
+     * <p>Note: CONCAT_WS() does not skip any empty strings, however it does 
skip any NULL values
+     * after the separator. For example, concat_ws(",", "a", null, "c") would 
yield "a,c".
+     */
+    public static BinaryString concatWs(BinaryString separator, 
Iterable<BinaryString> inputs) {
+        if (null == separator) {
+            return null;
+        }
+
+        int numInputBytes = 0; // total number of bytes from the inputs
+        int numInputs = 0; // number of non-null inputs
+        for (BinaryString input : inputs) {
+            if (input != null) {
+                numInputBytes += input.getSizeInBytes();
+                numInputs++;
+            }
+        }
+
+        if (numInputs == 0) {
+            // Return an empty string if there is no input, or all the inputs 
are null.
+            return EMPTY_UTF8;
+        }
+
+        // Allocate a new byte array, and copy the inputs one by one into it.
+        // The size of the new array is the size of all inputs, plus the 
separators.
+        final byte[] result =
+                new byte[numInputBytes + (numInputs - 1) * 
separator.getSizeInBytes()];
+        int offset = 0;
+
+        int j = 0;
+        for (BinaryString input : inputs) {
+            if (input != null) {
+                int len = input.getSizeInBytes();
+                MemorySegmentUtils.copyToBytes(
+                        input.getSegments(), input.getOffset(), result, 
offset, len);
+                offset += len;
+
+                j++;
+                // Add separator if this is not the last input.
+                if (j < numInputs) {
+                    MemorySegmentUtils.copyToBytes(
+                            separator.getSegments(),
+                            separator.getOffset(),
+                            result,
+                            offset,
+                            separator.getSizeInBytes());
+                    offset += separator.getSizeInBytes();
+                }
+            }
+        }
+        return fromBytes(result);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 8b93da7444..ef32f49cfb 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -31,6 +31,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.predicate.TransformPredicate;
 import org.apache.paimon.types.RowType;
 
 import org.slf4j.Logger;
@@ -131,6 +132,11 @@ public class FileIndexPredicate implements Closeable {
                         }
                         return names;
                     }
+
+                    @Override
+                    public Set<String> visit(TransformPredicate predicate) {
+                        throw new UnsupportedOperationException();
+                    }
                 });
     }
 
@@ -197,5 +203,10 @@ public class FileIndexPredicate implements Closeable {
                 return compoundResult == null ? REMAIN : compoundResult;
             }
         }
+
+        @Override
+        public FileIndexResult visit(TransformPredicate predicate) {
+            throw new UnsupportedOperationException();
+        }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
index fbb8efb777..062e9d9bd6 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.fileindex;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.FunctionVisitor;
 import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.predicate.TransformPredicate;
 
 import java.util.List;
 
@@ -124,4 +125,9 @@ public abstract class FileIndexReader implements 
FunctionVisitor<FileIndexResult
     public FileIndexResult visitTopN(TopN topN, FileIndexResult result) {
         return REMAIN;
     }
+
+    @Override
+    public FileIndexResult visit(TransformPredicate predicate) {
+        return REMAIN;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatTransform.java
similarity index 67%
copy from 
paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
copy to 
paimon-common/src/main/java/org/apache/paimon/predicate/ConcatTransform.java
index c741c05d46..4c99df2dfc 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatTransform.java
@@ -18,10 +18,21 @@
 
 package org.apache.paimon.predicate;
 
-/** A visitor to visit {@link Predicate}. */
-public interface PredicateVisitor<T> {
+import org.apache.paimon.data.BinaryString;
 
-    T visit(LeafPredicate predicate);
+import java.util.List;
 
-    T visit(CompoundPredicate predicate);
+/** Concat {@link Transform}. */
+public class ConcatTransform extends StringTransform {
+
+    private static final long serialVersionUID = 1L;
+
+    public ConcatTransform(List<Object> inputs) {
+        super(inputs);
+    }
+
+    @Override
+    public BinaryString transform(List<BinaryString> inputs) {
+        return BinaryString.concat(inputs);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatWsTransform.java
similarity index 57%
copy from 
paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
copy to 
paimon-common/src/main/java/org/apache/paimon/predicate/ConcatWsTransform.java
index c741c05d46..b121799cd8 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/ConcatWsTransform.java
@@ -18,10 +18,25 @@
 
 package org.apache.paimon.predicate;
 
-/** A visitor to visit {@link Predicate}. */
-public interface PredicateVisitor<T> {
+import org.apache.paimon.data.BinaryString;
 
-    T visit(LeafPredicate predicate);
+import java.util.List;
 
-    T visit(CompoundPredicate predicate);
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** ConcatWs {@link Transform}. */
+public class ConcatWsTransform extends StringTransform {
+
+    private static final long serialVersionUID = 1L;
+
+    public ConcatWsTransform(List<Object> inputs) {
+        super(inputs);
+        checkArgument(inputs.size() >= 2);
+    }
+
+    @Override
+    public BinaryString transform(List<BinaryString> inputs) {
+        BinaryString separator = inputs.get(0);
+        return BinaryString.concatWs(separator, inputs.subList(1, 
inputs.size()));
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java
index a5cf772c0c..0d3b09778b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java
@@ -41,4 +41,9 @@ public class LeafPredicateExtractor implements 
PredicateVisitor<Map<String, Leaf
         }
         return Collections.emptyMap();
     }
+
+    @Override
+    public Map<String, LeafPredicate> visit(TransformPredicate predicate) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/OnlyPartitionKeyEqualVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/OnlyPartitionKeyEqualVisitor.java
index 1eda670db8..f69e6a9cb1 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/OnlyPartitionKeyEqualVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/OnlyPartitionKeyEqualVisitor.java
@@ -121,4 +121,9 @@ public class OnlyPartitionKeyEqualVisitor implements 
FunctionVisitor<Boolean> {
     public Boolean visitOr(List<Boolean> children) {
         return false;
     }
+
+    @Override
+    public Boolean visit(TransformPredicate predicate) {
+        return false;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
index b859c7a56c..fa180aa2f4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java
@@ -45,4 +45,17 @@ public class PartitionPredicateVisitor implements 
PredicateVisitor<Boolean> {
         }
         return true;
     }
+
+    @Override
+    public Boolean visit(TransformPredicate predicate) {
+        Transform transform = predicate.transform();
+        for (Object input : transform.inputs()) {
+            if (input instanceof FieldRef) {
+                if (!partitionKeys.contains(((FieldRef) input).name())) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
index 8bd627f39a..f93e5123e9 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java
@@ -63,4 +63,9 @@ public class PredicateProjectionConverter implements 
PredicateVisitor<Optional<P
         }
         return Optional.of(new CompoundPredicate(predicate.function(), 
converted));
     }
+
+    @Override
+    public Optional<Predicate> visit(TransformPredicate predicate) {
+        throw new UnsupportedOperationException("TODO");
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
index f5bc0cf679..87dc84b561 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Optional;
 
 /** A {@link PredicateVisitor} to replace {@link Predicate}. */
+@FunctionalInterface
 public interface PredicateReplaceVisitor extends 
PredicateVisitor<Optional<Predicate>> {
 
     @Override
@@ -38,4 +39,9 @@ public interface PredicateReplaceVisitor extends 
PredicateVisitor<Optional<Predi
         }
         return Optional.of(new CompoundPredicate(predicate.function(), 
converted));
     }
+
+    @Override
+    default Optional<Predicate> visit(TransformPredicate predicate) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
index c741c05d46..9c0efe5c85 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
@@ -24,4 +24,6 @@ public interface PredicateVisitor<T> {
     T visit(LeafPredicate predicate);
 
     T visit(CompoundPredicate predicate);
+
+    T visit(TransformPredicate predicate);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/StringTransform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/StringTransform.java
new file mode 100644
index 0000000000..c01d3bcb28
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/StringTransform.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** {@link Transform} with string inputs and string output. */
+public abstract class StringTransform implements Transform {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<Object> inputs;
+
+    public StringTransform(List<Object> inputs) {
+        this.inputs = inputs;
+        for (Object input : inputs) {
+            if (input == null) {
+                continue;
+            }
+            if (input instanceof FieldRef) {
+                FieldRef ref = (FieldRef) input;
+                checkArgument(ref.type().is(CHARACTER_STRING));
+            } else {
+                checkArgument(input instanceof BinaryString);
+            }
+        }
+    }
+
+    @Override
+    public final List<Object> inputs() {
+        return inputs;
+    }
+
+    @Override
+    public final DataType outputType() {
+        return DataTypes.STRING();
+    }
+
+    @Override
+    public final Object transform(InternalRow row) {
+        List<BinaryString> strings = new ArrayList<>(inputs.size());
+        for (Object input : inputs) {
+            if (input instanceof FieldRef) {
+                FieldRef ref = (FieldRef) input;
+                int i = ref.index();
+                strings.add(row.isNullAt(i) ? null : row.getString(i));
+            } else {
+                strings.add((BinaryString) input);
+            }
+        }
+        return transform(strings);
+    }
+
+    protected abstract BinaryString transform(List<BinaryString> inputs);
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        StringTransform that = (StringTransform) o;
+        return Objects.equals(inputs, that.inputs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(inputs);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{" + "inputs=" + inputs + '}';
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
similarity index 72%
copy from 
paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
copy to paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
index c741c05d46..3ab5c97e07 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
@@ -18,10 +18,18 @@
 
 package org.apache.paimon.predicate;
 
-/** A visitor to visit {@link Predicate}. */
-public interface PredicateVisitor<T> {
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
 
-    T visit(LeafPredicate predicate);
+import java.io.Serializable;
+import java.util.List;
 
-    T visit(CompoundPredicate predicate);
+/** Represents a transform function. */
+public interface Transform extends Serializable {
+
+    List<Object> inputs();
+
+    DataType outputType();
+
+    Object transform(InternalRow row);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java
new file mode 100644
index 0000000000..266bb517d9
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.ListSerializer;
+import org.apache.paimon.data.serializer.NullableSerializer;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** A {@link Predicate} with {@link Transform}. */
+public class TransformPredicate implements Predicate {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Transform transform;
+    private final LeafFunction function;
+    private transient List<Object> literals;
+
+    public TransformPredicate(Transform transform, LeafFunction function, 
List<Object> literals) {
+        this.transform = transform;
+        this.function = function;
+        this.literals = literals;
+    }
+
+    public Transform transform() {
+        return transform;
+    }
+
+    @Override
+    public boolean test(InternalRow row) {
+        Object value = transform.transform(row);
+        return function.test(transform.outputType(), value, literals);
+    }
+
+    @Override
+    public boolean test(
+            long rowCount, InternalRow minValues, InternalRow maxValues, 
InternalArray nullCounts) {
+        return true;
+    }
+
+    @Override
+    public Optional<Predicate> negate() {
+        return Optional.empty();
+    }
+
+    @Override
+    public <T> T visit(PredicateVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TransformPredicate that = (TransformPredicate) o;
+        return Objects.equals(transform, that.transform)
+                && Objects.equals(function, that.function)
+                && Objects.equals(literals, that.literals);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transform, function, literals);
+    }
+
+    @Override
+    public String toString() {
+        return "TransformPredicate{"
+                + "transform="
+                + transform
+                + ", function="
+                + function
+                + ", literals="
+                + literals
+                + '}';
+    }
+
+    private ListSerializer<Object> objectsSerializer() {
+        return new ListSerializer<>(
+                NullableSerializer.wrapIfNullIsNotSupported(
+                        InternalSerializers.create(transform.outputType())));
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        objectsSerializer().serialize(literals, new 
DataOutputViewStreamWrapper(out));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        literals = objectsSerializer().deserialize(new 
DataInputViewStreamWrapper(in));
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java 
b/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
index c574ae8bea..2c22f00c30 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BinaryStringTest.java
@@ -35,6 +35,8 @@ import java.util.Random;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.paimon.data.BinaryString.EMPTY_UTF8;
 import static org.apache.paimon.data.BinaryString.blankString;
+import static org.apache.paimon.data.BinaryString.concat;
+import static org.apache.paimon.data.BinaryString.concatWs;
 import static org.apache.paimon.data.BinaryString.fromBytes;
 import static org.apache.paimon.utils.DecimalUtils.castFrom;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -221,6 +223,42 @@ public class BinaryStringTest {
         assertThat(binaryString2.compareTo(binaryString1)).isEqualTo(1);
     }
 
+    @TestTemplate
+    void concatTest() {
+        assertThat(concat()).isEqualTo(EMPTY_UTF8);
+        assertThat(concat((BinaryString) null)).isNull();
+        assertThat(concat(EMPTY_UTF8)).isEqualTo(EMPTY_UTF8);
+        assertThat(concat(fromString("ab"))).isEqualTo(fromString("ab"));
+        assertThat(concat(fromString("a"), 
fromString("b"))).isEqualTo(fromString("ab"));
+        assertThat(concat(fromString("a"), fromString("b"), fromString("c")))
+                .isEqualTo(fromString("abc"));
+        assertThat(concat(fromString("a"), null, fromString("c"))).isNull();
+        assertThat(concat(fromString("a"), null, null)).isNull();
+        assertThat(concat(null, null, null)).isNull();
+        assertThat(concat(fromString("数据"), 
fromString("砖头"))).isEqualTo(fromString("数据砖头"));
+    }
+
+    @TestTemplate
+    void concatWsTest() {
+        // Returns empty if the separator is null
+        assertThat(concatWs(null, (BinaryString) null)).isNull();
+        assertThat(concatWs(null, fromString("a"))).isNull();
+
+        // If separator is null, concatWs should skip all null inputs and 
never return null.
+        BinaryString sep = fromString("哈哈");
+        assertThat(concatWs(sep, EMPTY_UTF8)).isEqualTo(EMPTY_UTF8);
+        assertThat(concatWs(sep, 
fromString("ab"))).isEqualTo(fromString("ab"));
+        assertThat(concatWs(sep, fromString("a"), 
fromString("b"))).isEqualTo(fromString("a哈哈b"));
+        assertThat(concatWs(sep, fromString("a"), fromString("b"), 
fromString("c")))
+                .isEqualTo(fromString("a哈哈b哈哈c"));
+        assertThat(concatWs(sep, fromString("a"), null, fromString("c")))
+                .isEqualTo(fromString("a哈哈c"));
+        assertThat(concatWs(sep, fromString("a"), null, 
null)).isEqualTo(fromString("a"));
+        assertThat(concatWs(sep, null, null, null)).isEqualTo(EMPTY_UTF8);
+        assertThat(concatWs(sep, fromString("数据"), fromString("砖头")))
+                .isEqualTo(fromString("数据哈哈砖头"));
+    }
+
     @TestTemplate
     public void contains() {
         assertThat(EMPTY_UTF8.contains(EMPTY_UTF8)).isTrue();
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/ConcatTransformTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/ConcatTransformTest.java
new file mode 100644
index 0000000000..e776040f89
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/ConcatTransformTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ConcatTransformTest {
+
+    @Test
+    public void testConcatLiteralInputs() {
+        List<Object> inputs = new ArrayList<>();
+        inputs.add(BinaryString.fromString("ha"));
+        inputs.add(BinaryString.fromString("-he"));
+        ConcatTransform transform = new ConcatTransform(inputs);
+        Object result = transform.transform(GenericRow.of());
+        assertThat(result).isEqualTo(BinaryString.fromString("ha-he"));
+    }
+
+    @Test
+    public void testConcatRefInputs() {
+        List<Object> inputs = new ArrayList<>();
+        inputs.add(new FieldRef(1, "f1", DataTypes.STRING()));
+        inputs.add(new FieldRef(3, "f3", DataTypes.STRING()));
+        ConcatTransform transform = new ConcatTransform(inputs);
+        Object result =
+                transform.transform(
+                        GenericRow.of(
+                                BinaryString.fromString(""),
+                                BinaryString.fromString("ha"),
+                                BinaryString.fromString(""),
+                                BinaryString.fromString("-he")));
+        assertThat(result).isEqualTo(BinaryString.fromString("ha-he"));
+    }
+
+    @Test
+    public void testConcatHybridInputs() {
+        List<Object> inputs = new ArrayList<>();
+        inputs.add(BinaryString.fromString("ha"));
+        inputs.add(new FieldRef(3, "f3", DataTypes.STRING()));
+        ConcatTransform transform = new ConcatTransform(inputs);
+        Object result =
+                transform.transform(
+                        GenericRow.of(
+                                BinaryString.fromString(""),
+                                BinaryString.fromString(""),
+                                BinaryString.fromString(""),
+                                BinaryString.fromString("-he")));
+        assertThat(result).isEqualTo(BinaryString.fromString("ha-he"));
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/ConcatWsTransformTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/ConcatWsTransformTest.java
new file mode 100644
index 0000000000..f85362fd41
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/ConcatWsTransformTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ConcatWsTransformTest {
+
+    @Test
+    public void testConcatWsLiteralInputs() {
+        List<Object> inputs = new ArrayList<>();
+        inputs.add(BinaryString.fromString("-"));
+        inputs.add(BinaryString.fromString("ha"));
+        inputs.add(BinaryString.fromString("he"));
+        ConcatWsTransform transform = new ConcatWsTransform(inputs);
+        Object result = transform.transform(GenericRow.of());
+        assertThat(result).isEqualTo(BinaryString.fromString("ha-he"));
+    }
+
+    @Test
+    public void testConcatWsRefInputs() {
+        List<Object> inputs = new ArrayList<>();
+        inputs.add(new FieldRef(2, "f2", DataTypes.STRING()));
+        inputs.add(new FieldRef(1, "f1", DataTypes.STRING()));
+        inputs.add(new FieldRef(3, "f3", DataTypes.STRING()));
+        ConcatWsTransform transform = new ConcatWsTransform(inputs);
+        Object result =
+                transform.transform(
+                        GenericRow.of(
+                                BinaryString.fromString(""),
+                                BinaryString.fromString("ha"),
+                                BinaryString.fromString("-"),
+                                BinaryString.fromString("he")));
+        assertThat(result).isEqualTo(BinaryString.fromString("ha-he"));
+    }
+
+    @Test
+    public void testConcatWsHybridInputs() {
+        List<Object> inputs = new ArrayList<>();
+        inputs.add(BinaryString.fromString("-"));
+        inputs.add(BinaryString.fromString("ha"));
+        inputs.add(new FieldRef(3, "f3", DataTypes.STRING()));
+        ConcatWsTransform transform = new ConcatWsTransform(inputs);
+        Object result =
+                transform.transform(
+                        GenericRow.of(
+                                BinaryString.fromString(""),
+                                BinaryString.fromString(""),
+                                BinaryString.fromString(""),
+                                BinaryString.fromString("he")));
+        assertThat(result).isEqualTo(BinaryString.fromString("ha-he"));
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/TransformPredicateTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/TransformPredicateTest.java
new file mode 100644
index 0000000000..90d144f8d0
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/TransformPredicateTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class TransformPredicateTest {
+
+    @Test
+    public void testReturnTrue() {
+        TransformPredicate predicate = create();
+        boolean result =
+                predicate.test(
+                        GenericRow.of(
+                                BinaryString.fromString("ha"), 
BinaryString.fromString("-he")));
+        assertThat(result).isTrue();
+    }
+
+    @Test
+    public void testReturnFalse() {
+        TransformPredicate predicate = create();
+        boolean result =
+                predicate.test(
+                        GenericRow.of(
+                                BinaryString.fromString("he"), 
BinaryString.fromString("-he")));
+        assertThat(result).isFalse();
+    }
+
+    @Test
+    public void testMinMax() {
+        TransformPredicate predicate = create();
+        boolean result = predicate.test(1, null, null, null);
+        assertThat(result).isTrue();
+    }
+
+    @Test
+    public void testClass() throws IOException, ClassNotFoundException {
+        TransformPredicate predicate = create();
+        TransformPredicate clone = InstantiationUtil.clone(predicate);
+        assertThat(clone).isEqualTo(predicate);
+        assertThat(clone.hashCode()).isEqualTo(predicate.hashCode());
+        assertThat(clone.toString()).isEqualTo(predicate.toString());
+    }
+
+    private TransformPredicate create() {
+        List<Object> inputs = new ArrayList<>();
+        inputs.add(new FieldRef(0, "f0", DataTypes.STRING()));
+        inputs.add(new FieldRef(1, "f1", DataTypes.STRING()));
+        ConcatTransform transform = new ConcatTransform(inputs);
+        List<Object> literals = new ArrayList<>();
+        literals.add(BinaryString.fromString("ha-he"));
+        return new TransformPredicate(transform, Equal.INSTANCE, literals);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcPredicateFunctionVisitor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcPredicateFunctionVisitor.java
index 23612edd71..c458030b64 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcPredicateFunctionVisitor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcPredicateFunctionVisitor.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.FunctionVisitor;
+import org.apache.paimon.predicate.TransformPredicate;
 import org.apache.paimon.types.DataType;
 
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -238,6 +239,11 @@ public class OrcPredicateFunctionVisitor
         }
     }
 
+    @Override
+    public Optional<OrcFilters.Predicate> visit(TransformPredicate predicate) {
+        return Optional.empty();
+    }
+
     /**
      * Function which takes three arguments.
      *
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
 
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
index cc6be6eca4..cc3f5713af 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.FunctionVisitor;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TransformPredicate;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BinaryType;
@@ -200,6 +201,11 @@ public class ParquetFilters {
 
             throw new UnsupportedOperationException();
         }
+
+        @Override
+        public FilterPredicate visit(TransformPredicate predicate) {
+            throw new UnsupportedOperationException();
+        }
     }
 
     private static <T> Set<T> convertSets(List<Object> values, Class<T> 
kclass) {


Reply via email to