This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d97e493e [paimon] Paimon source supports filter push down (#1523)
5d97e493e is described below

commit 5d97e493e0ea1cd05b8a00df03c86880e4e6e495
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 12 14:51:56 2025 +0800

    [paimon] Paimon source supports filter push down (#1523)
---
 .../fluss/lake/paimon/source/PaimonLakeSource.java |  33 +++-
 .../lake/paimon/source/PaimonRecordReader.java     |   2 +-
 .../lake/paimon/source/PaimonSplitPlanner.java     |   4 +-
 .../utils/FlussToPaimonPredicateConverter.java     | 165 ++++++++++++++++++
 .../lake/paimon/source/PaimonLakeSourceTest.java   | 192 +++++++++++++++++++++
 .../lake/paimon/source/PaimonRecordReaderTest.java |   2 +-
 .../lake/paimon/source/PaimonSourceTestBase.java   |   2 +-
 .../lake/paimon/source/PaimonSplitPlannerTest.java |   5 +-
 .../paimon/source/PaimonSplitSerializerTest.java   |   9 +-
 .../fluss/lake/paimon/source/PaimonSplitTest.java  |   5 +-
 .../utils/FlussToPaimonPredicateConverterTest.java | 140 +++++++++++++++
 11 files changed, 539 insertions(+), 20 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
index a41f5a2fc..2d9462e52 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -19,6 +19,7 @@
 package com.alibaba.fluss.lake.paimon.source;
 
 import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.paimon.utils.FlussToPaimonPredicateConverter;
 import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
 import com.alibaba.fluss.lake.source.LakeSource;
 import com.alibaba.fluss.lake.source.Planner;
@@ -30,13 +31,16 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 
@@ -69,7 +73,23 @@ public class PaimonLakeSource implements 
LakeSource<PaimonSplit> {
 
     @Override
     public FilterPushDownResult withFilters(List<Predicate> predicates) {
-        return FilterPushDownResult.of(Collections.emptyList(), predicates);
+        List<Predicate> unConsumedPredicates = new ArrayList<>();
+        List<Predicate> consumedPredicates = new ArrayList<>();
+        List<org.apache.paimon.predicate.Predicate> converted = new 
ArrayList<>();
+        for (Predicate predicate : predicates) {
+            Optional<org.apache.paimon.predicate.Predicate> optPredicate =
+                    
FlussToPaimonPredicateConverter.convert(getRowType(tablePath), predicate);
+            if (optPredicate.isPresent()) {
+                consumedPredicates.add(predicate);
+                converted.add(optPredicate.get());
+            } else {
+                unConsumedPredicates.add(predicate);
+            }
+        }
+        if (!converted.isEmpty()) {
+            predicate = PredicateBuilder.and(converted);
+        }
+        return FilterPushDownResult.of(consumedPredicates, 
unConsumedPredicates);
     }
 
     @Override
@@ -107,4 +127,13 @@ public class PaimonLakeSource implements 
LakeSource<PaimonSplit> {
     private FileStoreTable getTable(Catalog catalog, TablePath tablePath) 
throws Exception {
         return (FileStoreTable) catalog.getTable(toPaimon(tablePath));
     }
+
+    private RowType getRowType(TablePath tablePath) {
+        try (Catalog catalog = getCatalog()) {
+            FileStoreTable fileStoreTable = getTable(catalog, tablePath);
+            return fileStoreTable.rowType();
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to get row type of " + tablePath, 
e);
+        }
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
index 1ace3dfcd..d58337cb6 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
@@ -67,7 +67,7 @@ public class PaimonRecordReader implements RecordReader {
             readBuilder.withFilter(predicate);
         }
 
-        TableRead tableRead = readBuilder.newRead();
+        TableRead tableRead = readBuilder.newRead().executeFilter();
         paimonRowType = readBuilder.readType();
 
         org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
index e481d8102..0371a0159 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
@@ -66,8 +66,10 @@ public class PaimonSplitPlanner implements 
Planner<PaimonSplit> {
             List<PaimonSplit> splits = new ArrayList<>();
             try (Catalog catalog = getCatalog()) {
                 FileStoreTable fileStoreTable = getTable(catalog, tablePath, 
snapshotId);
-                // TODO: support filter .withFilter(predicate)
                 InnerTableScan tableScan = fileStoreTable.newScan();
+                if (predicate != null) {
+                    tableScan = tableScan.withFilter(predicate);
+                }
                 for (Split split : tableScan.plan().splits()) {
                     DataSplit dataSplit = (DataSplit) split;
                     splits.add(new PaimonSplit(dataSplit));
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
new file mode 100644
index 000000000..e2f28b302
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.utils;
+
+import com.alibaba.fluss.predicate.And;
+import com.alibaba.fluss.predicate.CompoundPredicate;
+import com.alibaba.fluss.predicate.FieldRef;
+import com.alibaba.fluss.predicate.FunctionVisitor;
+import com.alibaba.fluss.predicate.LeafPredicate;
+import com.alibaba.fluss.predicate.Or;
+import com.alibaba.fluss.predicate.PredicateVisitor;
+
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Converts a Fluss {@link com.alibaba.fluss.predicate.Predicate} into a 
Paimon {@link Predicate}.
+ *
+ * <p>This class implements the {@link PredicateVisitor} pattern to traverse a 
tree of Fluss
+ * predicates. It handles both leaf-level conditions (like equals, greater 
than) and compound
+ * conditions (AND, OR).
+ */
+public class FlussToPaimonPredicateConverter implements 
PredicateVisitor<Predicate> {
+
+    private final PredicateBuilder builder;
+    private final LeafFunctionConverter converter = new 
LeafFunctionConverter();
+
+    public FlussToPaimonPredicateConverter(RowType rowType) {
+        this.builder = new PredicateBuilder(rowType);
+    }
+
+    public static Optional<Predicate> convert(
+            RowType rowType, com.alibaba.fluss.predicate.Predicate 
flussPredicate) {
+        try {
+            return Optional.of(flussPredicate.visit(new 
FlussToPaimonPredicateConverter(rowType)));
+        } catch (UnsupportedOperationException e) {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public Predicate visit(LeafPredicate predicate) {
+        // Delegate the conversion of the specific function to a dedicated 
visitor.
+        // This avoids a long chain of 'if-instanceof' checks.
+        return predicate.visit(converter);
+    }
+
+    @Override
+    public Predicate visit(CompoundPredicate predicate) {
+        List<Predicate> children =
+                predicate.children().stream().map(p -> 
p.visit(this)).collect(Collectors.toList());
+        CompoundPredicate.Function function = predicate.function();
+        if (function instanceof And) {
+            return PredicateBuilder.and(children);
+        } else if (function instanceof Or) {
+            return PredicateBuilder.or(children);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported fluss compound predicate function: " + 
predicate.function());
+        }
+    }
+
+    /**
+     * A visitor that implements the logic to convert each type of {@link
+     * com.alibaba.fluss.predicate.LeafFunction} to a Paimon {@link Predicate}.
+     */
+    private class LeafFunctionConverter implements FunctionVisitor<Predicate> {
+
+        @Override
+        public Predicate visitIsNotNull(FieldRef fieldRef) {
+            return builder.isNotNull(fieldRef.index());
+        }
+
+        @Override
+        public Predicate visitIsNull(FieldRef fieldRef) {
+            return builder.isNull(fieldRef.index());
+        }
+
+        @Override
+        public Predicate visitStartsWith(FieldRef fieldRef, Object literal) {
+            return builder.startsWith(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitEndsWith(FieldRef fieldRef, Object literal) {
+            return builder.endsWith(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitContains(FieldRef fieldRef, Object literal) {
+            return builder.contains(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitLessThan(FieldRef fieldRef, Object literal) {
+            return builder.lessThan(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitGreaterOrEqual(FieldRef fieldRef, Object 
literal) {
+            return builder.greaterOrEqual(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitNotEqual(FieldRef fieldRef, Object literal) {
+            return builder.notEqual(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitLessOrEqual(FieldRef fieldRef, Object literal) {
+            return builder.lessOrEqual(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitEqual(FieldRef fieldRef, Object literal) {
+            return builder.equal(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitGreaterThan(FieldRef fieldRef, Object literal) {
+            return builder.greaterThan(fieldRef.index(), literal);
+        }
+
+        @Override
+        public Predicate visitIn(FieldRef fieldRef, List<Object> literals) {
+            return builder.in(fieldRef.index(), literals);
+        }
+
+        @Override
+        public Predicate visitNotIn(FieldRef fieldRef, List<Object> literals) {
+            return builder.notIn(fieldRef.index(), literals);
+        }
+
+        @Override
+        public Predicate visitAnd(List<Predicate> children) {
+            // shouldn't come to here
+            throw new UnsupportedOperationException("Unsupported visitAnd 
method.");
+        }
+
+        @Override
+        public Predicate visitOr(List<Predicate> children) {
+            // shouldn't come to here
+            throw new UnsupportedOperationException("Unsupported visitOr 
method.");
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
new file mode 100644
index 000000000..6b2c7ed56
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.predicate.FieldRef;
+import com.alibaba.fluss.predicate.FunctionVisitor;
+import com.alibaba.fluss.predicate.LeafFunction;
+import com.alibaba.fluss.predicate.LeafPredicate;
+import com.alibaba.fluss.predicate.Predicate;
+import com.alibaba.fluss.predicate.PredicateBuilder;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.schema.Schema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link PaimonLakeSource}. */
+class PaimonLakeSourceTest extends PaimonSourceTestBase {
+
+    private static final Schema SCHEMA =
+            Schema.newBuilder()
+                    .column("id", org.apache.paimon.types.DataTypes.INT())
+                    .column("name", org.apache.paimon.types.DataTypes.STRING())
+                    .column("__bucket", 
org.apache.paimon.types.DataTypes.INT())
+                    .column("__offset", 
org.apache.paimon.types.DataTypes.BIGINT())
+                    .column("__timestamp", 
org.apache.paimon.types.DataTypes.TIMESTAMP(6))
+                    .primaryKey("id")
+                    .option(CoreOptions.BUCKET.key(), "1")
+                    .build();
+
+    private static final PredicateBuilder FLUSS_BUILDER =
+            new PredicateBuilder(RowType.of(DataTypes.BIGINT(), 
DataTypes.STRING()));
+
+    @BeforeAll
+    protected static void beforeAll() {
+        PaimonSourceTestBase.beforeAll();
+    }
+
+    @Test
+    void testWithFilters() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", "test_filters");
+        createTable(tablePath, SCHEMA);
+
+        // write some rows
+        List<InternalRow> rows = new ArrayList<>();
+        for (int i = 1; i <= 4; i++) {
+            rows.add(
+                    GenericRow.of(
+                            i,
+                            BinaryString.fromString("name" + i),
+                            0,
+                            (long) i,
+                            
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+        }
+        writeRecord(tablePath, rows);
+
+        // write some rows again
+        rows = new ArrayList<>();
+        for (int i = 10; i <= 14; i++) {
+            rows.add(
+                    GenericRow.of(
+                            i,
+                            BinaryString.fromString("name" + i),
+                            0,
+                            (long) i,
+                            
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+        }
+        writeRecord(tablePath, rows);
+
+        // test all filter can be accepted
+        Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
+        Predicate filter2 = FLUSS_BUILDER.lessOrEqual(0, 3);
+        List<Predicate> allFilters = Arrays.asList(filter1, filter2);
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        LakeSource.FilterPushDownResult filterPushDownResult = 
lakeSource.withFilters(allFilters);
+        
assertThat(filterPushDownResult.acceptedPredicates()).isEqualTo(allFilters);
+        assertThat(filterPushDownResult.remainingPredicates()).isEmpty();
+
+        // read data to verify the filters work
+        List<PaimonSplit> paimonSplits = lakeSource.createPlanner(() -> 
2).plan();
+        assertThat(paimonSplits).hasSize(1);
+        PaimonSplit paimonSplit = paimonSplits.get(0);
+        // make sure we only have one data file after filter to check plan 
will make use
+        // of filters
+        assertThat(paimonSplit.dataSplit().dataFiles()).hasSize(1);
+
+        // read data with filter to mae sure the reader with filter works 
properly
+        List<Row> actual = new ArrayList<>();
+        com.alibaba.fluss.row.InternalRow.FieldGetter[] fieldGetters =
+                com.alibaba.fluss.row.InternalRow.createFieldGetters(
+                        RowType.of(new IntType(), new StringType()));
+        RecordReader recordReader = lakeSource.createRecordReader(() -> 
paimonSplit);
+        try (CloseableIterator<LogRecord> iterator = recordReader.read()) {
+            actual.addAll(
+                    convertToFlinkRow(
+                            fieldGetters,
+                            TransformingCloseableIterator.transform(iterator, 
LogRecord::getRow)));
+        }
+        assertThat(actual.toString()).isEqualTo("[+I[2, name2], +I[3, 
name3]]");
+
+        // test mix one unaccepted filter
+        Predicate nonConvertibleFilter =
+                new LeafPredicate(
+                        new UnSupportFilterFunction(),
+                        DataTypes.INT(),
+                        0,
+                        "f1",
+                        Collections.emptyList());
+        allFilters = Arrays.asList(nonConvertibleFilter, filter1, filter2);
+
+        filterPushDownResult = lakeSource.withFilters(allFilters);
+        assertThat(filterPushDownResult.acceptedPredicates().toString())
+                .isEqualTo(Arrays.asList(filter1, filter2).toString());
+        assertThat(filterPushDownResult.remainingPredicates().toString())
+                
.isEqualTo(Collections.singleton(nonConvertibleFilter).toString());
+
+        // test all are unaccepted filter
+        allFilters = Arrays.asList(nonConvertibleFilter, nonConvertibleFilter);
+        filterPushDownResult = lakeSource.withFilters(allFilters);
+        assertThat(filterPushDownResult.acceptedPredicates()).isEmpty();
+        assertThat(filterPushDownResult.remainingPredicates().toString())
+                .isEqualTo(allFilters.toString());
+    }
+
+    private static class UnSupportFilterFunction extends LeafFunction {
+
+        @Override
+        public boolean test(DataType type, Object field, List<Object> 
literals) {
+            return false;
+        }
+
+        @Override
+        public boolean test(
+                DataType type,
+                long rowCount,
+                Object min,
+                Object max,
+                Long nullCount,
+                List<Object> literals) {
+            return false;
+        }
+
+        @Override
+        public Optional<LeafFunction> negate() {
+            return Optional.empty();
+        }
+
+        @Override
+        public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+            throw new UnsupportedOperationException(
+                    "Unsupported filter function for test purpose.");
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
index eb9b7a1ca..e29560298 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
@@ -117,7 +117,7 @@ class PaimonRecordReaderTest extends PaimonSourceTestBase {
     @Test
     void testReadLogTableWithProject() throws Exception {
         // first of all, create table and prepare data
-        String tableName = "logTable_non_partitioned";
+        String tableName = "logTable_non_partitioned_with_project";
 
         TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
 
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
index 93c73d329..9d0961152 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
@@ -73,7 +73,7 @@ class PaimonSourceTestBase {
 
     public void writeRecord(TablePath tablePath, List<InternalRow> records) 
throws Exception {
         Table table = getTable(tablePath);
-        BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder().withOverwrite();
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         try (BatchTableWrite writer = writeBuilder.newWrite()) {
             for (InternalRow record : records) {
                 writer.write(record);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
index 0d9e509fe..293248fc7 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.schema.Schema;
@@ -58,9 +57,7 @@ class PaimonSplitPlannerTest extends PaimonSourceTestBase {
         builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
         builder.option(CoreOptions.BUCKET_KEY.key(), "c1");
         createTable(tablePath, builder.build());
-        Table table =
-                paimonCatalog.getTable(
-                        Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName()));
+        Table table = getTable(tablePath);
 
         GenericRow record1 =
                 GenericRow.of(12, BinaryString.fromString("a"), 
BinaryString.fromString("A"));
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
index 9f07509be..daba334fe 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.schema.Schema;
@@ -34,7 +33,7 @@ import org.apache.paimon.types.DataTypes;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -58,13 +57,11 @@ class PaimonSplitSerializerTest extends 
PaimonSourceTestBase {
         builder.primaryKey("c1", "c3");
         builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
         createTable(tablePath, builder.build());
-        Table table =
-                paimonCatalog.getTable(
-                        Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName()));
+        Table table = getTable(tablePath);
 
         GenericRow record1 =
                 GenericRow.of(12, BinaryString.fromString("a"), 
BinaryString.fromString("A"));
-        writeRecord(tablePath, Arrays.asList(record1));
+        writeRecord(tablePath, Collections.singletonList(record1));
         Snapshot snapshot = table.latestSnapshot().get();
 
         LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
index b0139d1d8..4b499d521 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.schema.Schema;
@@ -58,9 +57,7 @@ class PaimonSplitTest extends PaimonSourceTestBase {
         builder.primaryKey("c1", "c3");
         builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
         createTable(tablePath, builder.build());
-        Table table =
-                paimonCatalog.getTable(
-                        Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName()));
+        Table table = getTable(tablePath);
 
         GenericRow record1 =
                 GenericRow.of(12, BinaryString.fromString("a"), 
BinaryString.fromString("A"));
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
new file mode 100644
index 000000000..5afec4177
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.utils;
+
+import com.alibaba.fluss.predicate.Predicate;
+import com.alibaba.fluss.predicate.PredicateBuilder;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlussToPaimonPredicateConverter}. */
+class FlussToPaimonPredicateConverterTest {
+
+    private static final PredicateBuilder FLUSS_BUILDER =
+            new PredicateBuilder(
+                    RowType.builder()
+                            .field("f1", DataTypes.BIGINT())
+                            .field("f2", DataTypes.DOUBLE())
+                            .field("f3", DataTypes.STRING())
+                            .build());
+
+    private static final org.apache.paimon.types.RowType PAIMON_ROW_TYPE =
+            org.apache.paimon.types.RowType.builder()
+                    .field("f1", org.apache.paimon.types.DataTypes.BIGINT())
+                    .field("f2", org.apache.paimon.types.DataTypes.DOUBLE())
+                    .field("f3", org.apache.paimon.types.DataTypes.STRING())
+                    .build();
+
+    private static final org.apache.paimon.predicate.PredicateBuilder 
PAIMON_BUILDER =
+            new org.apache.paimon.predicate.PredicateBuilder(PAIMON_ROW_TYPE);
+
+    public static Stream<Arguments> parameters() {
+        // A comprehensive set of test cases for different predicate types.
+        return Stream.of(
+                // Leaf Predicates
+                Arguments.of(FLUSS_BUILDER.equal(0, 12L), 
PAIMON_BUILDER.equal(0, 12L)),
+                Arguments.of(FLUSS_BUILDER.notEqual(2, "test"), 
PAIMON_BUILDER.notEqual(2, "test")),
+                Arguments.of(
+                        FLUSS_BUILDER.greaterThan(1, 99.9d), 
PAIMON_BUILDER.greaterThan(1, 99.9d)),
+                Arguments.of(
+                        FLUSS_BUILDER.greaterOrEqual(0, 100L),
+                        PAIMON_BUILDER.greaterOrEqual(0, 100L)),
+                Arguments.of(FLUSS_BUILDER.lessThan(1, 0.1d), 
PAIMON_BUILDER.lessThan(1, 0.1d)),
+                Arguments.of(FLUSS_BUILDER.lessOrEqual(0, 50L), 
PAIMON_BUILDER.lessOrEqual(0, 50L)),
+                Arguments.of(FLUSS_BUILDER.isNull(2), 
PAIMON_BUILDER.isNull(2)),
+                Arguments.of(FLUSS_BUILDER.isNotNull(1), 
PAIMON_BUILDER.isNotNull(1)),
+                Arguments.of(
+                        FLUSS_BUILDER.in(2, Arrays.asList("a", "b", "c")),
+                        PAIMON_BUILDER.in(2, Arrays.asList("a", "b", "c"))),
+                Arguments.of(
+                        FLUSS_BUILDER.in(
+                                2,
+                                Arrays.asList(
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                        "c")),
+                        PAIMON_BUILDER.in(
+                                2,
+                                Arrays.asList(
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                        "c"))),
+                Arguments.of(
+                        FLUSS_BUILDER.notIn(
+                                2,
+                                Arrays.asList(
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                        "c")),
+                        PAIMON_BUILDER.notIn(
+                                2,
+                                Arrays.asList(
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                        "c"))),
+                Arguments.of(
+                        FLUSS_BUILDER.startsWith(2, "start"),
+                        PAIMON_BUILDER.startsWith(2, "start")),
+                Arguments.of(FLUSS_BUILDER.endsWith(2, "end"), 
PAIMON_BUILDER.endsWith(2, "end")),
+                Arguments.of(FLUSS_BUILDER.contains(2, "mid"), 
PAIMON_BUILDER.contains(2, "mid")),
+
+                // Compound Predicates
+                Arguments.of(
+                        PredicateBuilder.and(
+                                FLUSS_BUILDER.equal(0, 1L), 
FLUSS_BUILDER.isNotNull(2)),
+                        org.apache.paimon.predicate.PredicateBuilder.and(
+                                PAIMON_BUILDER.equal(0, 1L), 
PAIMON_BUILDER.isNotNull(2))),
+                Arguments.of(
+                        PredicateBuilder.or(
+                                FLUSS_BUILDER.lessThan(1, 10.0),
+                                FLUSS_BUILDER.greaterThan(1, 100.0)),
+                        org.apache.paimon.predicate.PredicateBuilder.or(
+                                PAIMON_BUILDER.lessThan(1, 10.0),
+                                PAIMON_BUILDER.greaterThan(1, 100.0))),
+
+                // Nested Predicate
+                Arguments.of(
+                        PredicateBuilder.and(
+                                FLUSS_BUILDER.equal(2, "test"),
+                                PredicateBuilder.or(
+                                        FLUSS_BUILDER.equal(0, 1L),
+                                        FLUSS_BUILDER.greaterThan(1, 50.0))),
+                        org.apache.paimon.predicate.PredicateBuilder.and(
+                                PAIMON_BUILDER.equal(2, "test"),
+                                
org.apache.paimon.predicate.PredicateBuilder.or(
+                                        PAIMON_BUILDER.equal(0, 1L),
+                                        PAIMON_BUILDER.greaterThan(1, 
50.0)))));
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testPredicateConverter(
+            Predicate flussPredicate, org.apache.paimon.predicate.Predicate 
expectedPredicate) {
+        org.apache.paimon.predicate.Predicate convertedPaimonPredicate =
+                FlussToPaimonPredicateConverter.convert(PAIMON_ROW_TYPE, 
flussPredicate).get();
+        
assertThat(convertedPaimonPredicate.toString()).isEqualTo(expectedPredicate.toString());
+    }
+}


Reply via email to