This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5d97e493e [paimon] Paimon source supports filter push down (#1523)
5d97e493e is described below
commit 5d97e493e0ea1cd05b8a00df03c86880e4e6e495
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 12 14:51:56 2025 +0800
[paimon] Paimon source supports filter push down (#1523)
---
.../fluss/lake/paimon/source/PaimonLakeSource.java | 33 +++-
.../lake/paimon/source/PaimonRecordReader.java | 2 +-
.../lake/paimon/source/PaimonSplitPlanner.java | 4 +-
.../utils/FlussToPaimonPredicateConverter.java | 165 ++++++++++++++++++
.../lake/paimon/source/PaimonLakeSourceTest.java | 192 +++++++++++++++++++++
.../lake/paimon/source/PaimonRecordReaderTest.java | 2 +-
.../lake/paimon/source/PaimonSourceTestBase.java | 2 +-
.../lake/paimon/source/PaimonSplitPlannerTest.java | 5 +-
.../paimon/source/PaimonSplitSerializerTest.java | 9 +-
.../fluss/lake/paimon/source/PaimonSplitTest.java | 5 +-
.../utils/FlussToPaimonPredicateConverterTest.java | 140 +++++++++++++++
11 files changed, 539 insertions(+), 20 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
index a41f5a2fc..2d9462e52 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -19,6 +19,7 @@
package com.alibaba.fluss.lake.paimon.source;
import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.paimon.utils.FlussToPaimonPredicateConverter;
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.source.Planner;
@@ -30,13 +31,16 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
@@ -69,7 +73,23 @@ public class PaimonLakeSource implements
LakeSource<PaimonSplit> {
@Override
public FilterPushDownResult withFilters(List<Predicate> predicates) {
- return FilterPushDownResult.of(Collections.emptyList(), predicates);
+ List<Predicate> unConsumedPredicates = new ArrayList<>();
+ List<Predicate> consumedPredicates = new ArrayList<>();
+ List<org.apache.paimon.predicate.Predicate> converted = new
ArrayList<>();
+ for (Predicate predicate : predicates) {
+ Optional<org.apache.paimon.predicate.Predicate> optPredicate =
+
FlussToPaimonPredicateConverter.convert(getRowType(tablePath), predicate);
+ if (optPredicate.isPresent()) {
+ consumedPredicates.add(predicate);
+ converted.add(optPredicate.get());
+ } else {
+ unConsumedPredicates.add(predicate);
+ }
+ }
+ if (!converted.isEmpty()) {
+ predicate = PredicateBuilder.and(converted);
+ }
+ return FilterPushDownResult.of(consumedPredicates,
unConsumedPredicates);
}
@Override
@@ -107,4 +127,13 @@ public class PaimonLakeSource implements
LakeSource<PaimonSplit> {
private FileStoreTable getTable(Catalog catalog, TablePath tablePath)
throws Exception {
return (FileStoreTable) catalog.getTable(toPaimon(tablePath));
}
+
+ private RowType getRowType(TablePath tablePath) {
+ try (Catalog catalog = getCatalog()) {
+ FileStoreTable fileStoreTable = getTable(catalog, tablePath);
+ return fileStoreTable.rowType();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to get row type of " + tablePath,
e);
+ }
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
index 1ace3dfcd..d58337cb6 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
@@ -67,7 +67,7 @@ public class PaimonRecordReader implements RecordReader {
readBuilder.withFilter(predicate);
}
- TableRead tableRead = readBuilder.newRead();
+ TableRead tableRead = readBuilder.newRead().executeFilter();
paimonRowType = readBuilder.readType();
org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
index e481d8102..0371a0159 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
@@ -66,8 +66,10 @@ public class PaimonSplitPlanner implements
Planner<PaimonSplit> {
List<PaimonSplit> splits = new ArrayList<>();
try (Catalog catalog = getCatalog()) {
FileStoreTable fileStoreTable = getTable(catalog, tablePath,
snapshotId);
- // TODO: support filter .withFilter(predicate)
InnerTableScan tableScan = fileStoreTable.newScan();
+ if (predicate != null) {
+ tableScan = tableScan.withFilter(predicate);
+ }
for (Split split : tableScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
splits.add(new PaimonSplit(dataSplit));
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
new file mode 100644
index 000000000..e2f28b302
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.utils;
+
+import com.alibaba.fluss.predicate.And;
+import com.alibaba.fluss.predicate.CompoundPredicate;
+import com.alibaba.fluss.predicate.FieldRef;
+import com.alibaba.fluss.predicate.FunctionVisitor;
+import com.alibaba.fluss.predicate.LeafPredicate;
+import com.alibaba.fluss.predicate.Or;
+import com.alibaba.fluss.predicate.PredicateVisitor;
+
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Converts a Fluss {@link com.alibaba.fluss.predicate.Predicate} into a
Paimon {@link Predicate}.
+ *
+ * <p>This class implements the {@link PredicateVisitor} pattern to traverse a
tree of Fluss
+ * predicates. It handles both leaf-level conditions (like equals, greater
than) and compound
+ * conditions (AND, OR).
+ */
+public class FlussToPaimonPredicateConverter implements
PredicateVisitor<Predicate> {
+
+ private final PredicateBuilder builder;
+ private final LeafFunctionConverter converter = new
LeafFunctionConverter();
+
+ public FlussToPaimonPredicateConverter(RowType rowType) {
+ this.builder = new PredicateBuilder(rowType);
+ }
+
+ public static Optional<Predicate> convert(
+ RowType rowType, com.alibaba.fluss.predicate.Predicate
flussPredicate) {
+ try {
+ return Optional.of(flussPredicate.visit(new
FlussToPaimonPredicateConverter(rowType)));
+ } catch (UnsupportedOperationException e) {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Predicate visit(LeafPredicate predicate) {
+ // Delegate the conversion of the specific function to a dedicated
visitor.
+ // This avoids a long chain of 'if-instanceof' checks.
+ return predicate.visit(converter);
+ }
+
+ @Override
+ public Predicate visit(CompoundPredicate predicate) {
+ List<Predicate> children =
+ predicate.children().stream().map(p ->
p.visit(this)).collect(Collectors.toList());
+ CompoundPredicate.Function function = predicate.function();
+ if (function instanceof And) {
+ return PredicateBuilder.and(children);
+ } else if (function instanceof Or) {
+ return PredicateBuilder.or(children);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported fluss compound predicate function: " +
predicate.function());
+ }
+ }
+
+ /**
+ * A visitor that implements the logic to convert each type of {@link
+ * com.alibaba.fluss.predicate.LeafFunction} to a Paimon {@link Predicate}.
+ */
+ private class LeafFunctionConverter implements FunctionVisitor<Predicate> {
+
+ @Override
+ public Predicate visitIsNotNull(FieldRef fieldRef) {
+ return builder.isNotNull(fieldRef.index());
+ }
+
+ @Override
+ public Predicate visitIsNull(FieldRef fieldRef) {
+ return builder.isNull(fieldRef.index());
+ }
+
+ @Override
+ public Predicate visitStartsWith(FieldRef fieldRef, Object literal) {
+ return builder.startsWith(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitEndsWith(FieldRef fieldRef, Object literal) {
+ return builder.endsWith(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitContains(FieldRef fieldRef, Object literal) {
+ return builder.contains(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitLessThan(FieldRef fieldRef, Object literal) {
+ return builder.lessThan(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ return builder.greaterOrEqual(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitNotEqual(FieldRef fieldRef, Object literal) {
+ return builder.notEqual(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitLessOrEqual(FieldRef fieldRef, Object literal) {
+ return builder.lessOrEqual(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitEqual(FieldRef fieldRef, Object literal) {
+ return builder.equal(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitGreaterThan(FieldRef fieldRef, Object literal) {
+ return builder.greaterThan(fieldRef.index(), literal);
+ }
+
+ @Override
+ public Predicate visitIn(FieldRef fieldRef, List<Object> literals) {
+ return builder.in(fieldRef.index(), literals);
+ }
+
+ @Override
+ public Predicate visitNotIn(FieldRef fieldRef, List<Object> literals) {
+ return builder.notIn(fieldRef.index(), literals);
+ }
+
+ @Override
+ public Predicate visitAnd(List<Predicate> children) {
+ // shouldn't come to here
+ throw new UnsupportedOperationException("Unsupported visitAnd
method.");
+ }
+
+ @Override
+ public Predicate visitOr(List<Predicate> children) {
+ // shouldn't come to here
+ throw new UnsupportedOperationException("Unsupported visitOr
method.");
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
new file mode 100644
index 000000000..6b2c7ed56
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.predicate.FieldRef;
+import com.alibaba.fluss.predicate.FunctionVisitor;
+import com.alibaba.fluss.predicate.LeafFunction;
+import com.alibaba.fluss.predicate.LeafPredicate;
+import com.alibaba.fluss.predicate.Predicate;
+import com.alibaba.fluss.predicate.PredicateBuilder;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.schema.Schema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link PaimonLakeSource}. */
+class PaimonLakeSourceTest extends PaimonSourceTestBase {
+
+ private static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column("id", org.apache.paimon.types.DataTypes.INT())
+ .column("name", org.apache.paimon.types.DataTypes.STRING())
+ .column("__bucket",
org.apache.paimon.types.DataTypes.INT())
+ .column("__offset",
org.apache.paimon.types.DataTypes.BIGINT())
+ .column("__timestamp",
org.apache.paimon.types.DataTypes.TIMESTAMP(6))
+ .primaryKey("id")
+ .option(CoreOptions.BUCKET.key(), "1")
+ .build();
+
+ private static final PredicateBuilder FLUSS_BUILDER =
+ new PredicateBuilder(RowType.of(DataTypes.BIGINT(),
DataTypes.STRING()));
+
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonSourceTestBase.beforeAll();
+ }
+
+ @Test
+ void testWithFilters() throws Exception {
+ TablePath tablePath = TablePath.of("fluss", "test_filters");
+ createTable(tablePath, SCHEMA);
+
+ // write some rows
+ List<InternalRow> rows = new ArrayList<>();
+ for (int i = 1; i <= 4; i++) {
+ rows.add(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name" + i),
+ 0,
+ (long) i,
+
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+ }
+ writeRecord(tablePath, rows);
+
+ // write some rows again
+ rows = new ArrayList<>();
+ for (int i = 10; i <= 14; i++) {
+ rows.add(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name" + i),
+ 0,
+ (long) i,
+
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+ }
+ writeRecord(tablePath, rows);
+
+ // test all filter can be accepted
+ Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
+ Predicate filter2 = FLUSS_BUILDER.lessOrEqual(0, 3);
+ List<Predicate> allFilters = Arrays.asList(filter1, filter2);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ LakeSource.FilterPushDownResult filterPushDownResult =
lakeSource.withFilters(allFilters);
+
assertThat(filterPushDownResult.acceptedPredicates()).isEqualTo(allFilters);
+ assertThat(filterPushDownResult.remainingPredicates()).isEmpty();
+
+ // read data to verify the filters work
+ List<PaimonSplit> paimonSplits = lakeSource.createPlanner(() ->
2).plan();
+ assertThat(paimonSplits).hasSize(1);
+ PaimonSplit paimonSplit = paimonSplits.get(0);
+ // make sure we only have one data file after filter to check plan
will make use
+ // of filters
+ assertThat(paimonSplit.dataSplit().dataFiles()).hasSize(1);
+
+ // read data with filter to mae sure the reader with filter works
properly
+ List<Row> actual = new ArrayList<>();
+ com.alibaba.fluss.row.InternalRow.FieldGetter[] fieldGetters =
+ com.alibaba.fluss.row.InternalRow.createFieldGetters(
+ RowType.of(new IntType(), new StringType()));
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+ try (CloseableIterator<LogRecord> iterator = recordReader.read()) {
+ actual.addAll(
+ convertToFlinkRow(
+ fieldGetters,
+ TransformingCloseableIterator.transform(iterator,
LogRecord::getRow)));
+ }
+ assertThat(actual.toString()).isEqualTo("[+I[2, name2], +I[3,
name3]]");
+
+ // test mix one unaccepted filter
+ Predicate nonConvertibleFilter =
+ new LeafPredicate(
+ new UnSupportFilterFunction(),
+ DataTypes.INT(),
+ 0,
+ "f1",
+ Collections.emptyList());
+ allFilters = Arrays.asList(nonConvertibleFilter, filter1, filter2);
+
+ filterPushDownResult = lakeSource.withFilters(allFilters);
+ assertThat(filterPushDownResult.acceptedPredicates().toString())
+ .isEqualTo(Arrays.asList(filter1, filter2).toString());
+ assertThat(filterPushDownResult.remainingPredicates().toString())
+
.isEqualTo(Collections.singleton(nonConvertibleFilter).toString());
+
+ // test all are unaccepted filter
+ allFilters = Arrays.asList(nonConvertibleFilter, nonConvertibleFilter);
+ filterPushDownResult = lakeSource.withFilters(allFilters);
+ assertThat(filterPushDownResult.acceptedPredicates()).isEmpty();
+ assertThat(filterPushDownResult.remainingPredicates().toString())
+ .isEqualTo(allFilters.toString());
+ }
+
+ private static class UnSupportFilterFunction extends LeafFunction {
+
+ @Override
+ public boolean test(DataType type, Object field, List<Object>
literals) {
+ return false;
+ }
+
+ @Override
+ public boolean test(
+ DataType type,
+ long rowCount,
+ Object min,
+ Object max,
+ Long nullCount,
+ List<Object> literals) {
+ return false;
+ }
+
+ @Override
+ public Optional<LeafFunction> negate() {
+ return Optional.empty();
+ }
+
+ @Override
+ public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef,
List<Object> literals) {
+ throw new UnsupportedOperationException(
+ "Unsupported filter function for test purpose.");
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
index eb9b7a1ca..e29560298 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
@@ -117,7 +117,7 @@ class PaimonRecordReaderTest extends PaimonSourceTestBase {
@Test
void testReadLogTableWithProject() throws Exception {
// first of all, create table and prepare data
- String tableName = "logTable_non_partitioned";
+ String tableName = "logTable_non_partitioned_with_project";
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
index 93c73d329..9d0961152 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
@@ -73,7 +73,7 @@ class PaimonSourceTestBase {
public void writeRecord(TablePath tablePath, List<InternalRow> records)
throws Exception {
Table table = getTable(tablePath);
- BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder().withOverwrite();
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite writer = writeBuilder.newWrite()) {
for (InternalRow record : records) {
writer.write(record);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
index 0d9e509fe..293248fc7 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.schema.Schema;
@@ -58,9 +57,7 @@ class PaimonSplitPlannerTest extends PaimonSourceTestBase {
builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
builder.option(CoreOptions.BUCKET_KEY.key(), "c1");
createTable(tablePath, builder.build());
- Table table =
- paimonCatalog.getTable(
- Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+ Table table = getTable(tablePath);
GenericRow record1 =
GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
index 9f07509be..daba334fe 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.schema.Schema;
@@ -34,7 +33,7 @@ import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -58,13 +57,11 @@ class PaimonSplitSerializerTest extends
PaimonSourceTestBase {
builder.primaryKey("c1", "c3");
builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
createTable(tablePath, builder.build());
- Table table =
- paimonCatalog.getTable(
- Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+ Table table = getTable(tablePath);
GenericRow record1 =
GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
- writeRecord(tablePath, Arrays.asList(record1));
+ writeRecord(tablePath, Collections.singletonList(record1));
Snapshot snapshot = table.latestSnapshot().get();
LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
index b0139d1d8..4b499d521 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.schema.Schema;
@@ -58,9 +57,7 @@ class PaimonSplitTest extends PaimonSourceTestBase {
builder.primaryKey("c1", "c3");
builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
createTable(tablePath, builder.build());
- Table table =
- paimonCatalog.getTable(
- Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+ Table table = getTable(tablePath);
GenericRow record1 =
GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
new file mode 100644
index 000000000..5afec4177
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.utils;
+
+import com.alibaba.fluss.predicate.Predicate;
+import com.alibaba.fluss.predicate.PredicateBuilder;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlussToPaimonPredicateConverter}. */
+class FlussToPaimonPredicateConverterTest {
+
+ private static final PredicateBuilder FLUSS_BUILDER =
+ new PredicateBuilder(
+ RowType.builder()
+ .field("f1", DataTypes.BIGINT())
+ .field("f2", DataTypes.DOUBLE())
+ .field("f3", DataTypes.STRING())
+ .build());
+
+ private static final org.apache.paimon.types.RowType PAIMON_ROW_TYPE =
+ org.apache.paimon.types.RowType.builder()
+ .field("f1", org.apache.paimon.types.DataTypes.BIGINT())
+ .field("f2", org.apache.paimon.types.DataTypes.DOUBLE())
+ .field("f3", org.apache.paimon.types.DataTypes.STRING())
+ .build();
+
+ private static final org.apache.paimon.predicate.PredicateBuilder
PAIMON_BUILDER =
+ new org.apache.paimon.predicate.PredicateBuilder(PAIMON_ROW_TYPE);
+
+ public static Stream<Arguments> parameters() {
+ // A comprehensive set of test cases for different predicate types.
+ return Stream.of(
+ // Leaf Predicates
+ Arguments.of(FLUSS_BUILDER.equal(0, 12L),
PAIMON_BUILDER.equal(0, 12L)),
+ Arguments.of(FLUSS_BUILDER.notEqual(2, "test"),
PAIMON_BUILDER.notEqual(2, "test")),
+ Arguments.of(
+ FLUSS_BUILDER.greaterThan(1, 99.9d),
PAIMON_BUILDER.greaterThan(1, 99.9d)),
+ Arguments.of(
+ FLUSS_BUILDER.greaterOrEqual(0, 100L),
+ PAIMON_BUILDER.greaterOrEqual(0, 100L)),
+ Arguments.of(FLUSS_BUILDER.lessThan(1, 0.1d),
PAIMON_BUILDER.lessThan(1, 0.1d)),
+ Arguments.of(FLUSS_BUILDER.lessOrEqual(0, 50L),
PAIMON_BUILDER.lessOrEqual(0, 50L)),
+ Arguments.of(FLUSS_BUILDER.isNull(2),
PAIMON_BUILDER.isNull(2)),
+ Arguments.of(FLUSS_BUILDER.isNotNull(1),
PAIMON_BUILDER.isNotNull(1)),
+ Arguments.of(
+ FLUSS_BUILDER.in(2, Arrays.asList("a", "b", "c")),
+ PAIMON_BUILDER.in(2, Arrays.asList("a", "b", "c"))),
+ Arguments.of(
+ FLUSS_BUILDER.in(
+ 2,
+ Arrays.asList(
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c")),
+ PAIMON_BUILDER.in(
+ 2,
+ Arrays.asList(
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c"))),
+ Arguments.of(
+ FLUSS_BUILDER.notIn(
+ 2,
+ Arrays.asList(
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c")),
+ PAIMON_BUILDER.notIn(
+ 2,
+ Arrays.asList(
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c"))),
+ Arguments.of(
+ FLUSS_BUILDER.startsWith(2, "start"),
+ PAIMON_BUILDER.startsWith(2, "start")),
+ Arguments.of(FLUSS_BUILDER.endsWith(2, "end"),
PAIMON_BUILDER.endsWith(2, "end")),
+ Arguments.of(FLUSS_BUILDER.contains(2, "mid"),
PAIMON_BUILDER.contains(2, "mid")),
+
+ // Compound Predicates
+ Arguments.of(
+ PredicateBuilder.and(
+ FLUSS_BUILDER.equal(0, 1L),
FLUSS_BUILDER.isNotNull(2)),
+ org.apache.paimon.predicate.PredicateBuilder.and(
+ PAIMON_BUILDER.equal(0, 1L),
PAIMON_BUILDER.isNotNull(2))),
+ Arguments.of(
+ PredicateBuilder.or(
+ FLUSS_BUILDER.lessThan(1, 10.0),
+ FLUSS_BUILDER.greaterThan(1, 100.0)),
+ org.apache.paimon.predicate.PredicateBuilder.or(
+ PAIMON_BUILDER.lessThan(1, 10.0),
+ PAIMON_BUILDER.greaterThan(1, 100.0))),
+
+ // Nested Predicate
+ Arguments.of(
+ PredicateBuilder.and(
+ FLUSS_BUILDER.equal(2, "test"),
+ PredicateBuilder.or(
+ FLUSS_BUILDER.equal(0, 1L),
+ FLUSS_BUILDER.greaterThan(1, 50.0))),
+ org.apache.paimon.predicate.PredicateBuilder.and(
+ PAIMON_BUILDER.equal(2, "test"),
+
org.apache.paimon.predicate.PredicateBuilder.or(
+ PAIMON_BUILDER.equal(0, 1L),
+ PAIMON_BUILDER.greaterThan(1,
50.0)))));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testPredicateConverter(
+ Predicate flussPredicate, org.apache.paimon.predicate.Predicate
expectedPredicate) {
+ org.apache.paimon.predicate.Predicate convertedPaimonPredicate =
+ FlussToPaimonPredicateConverter.convert(PAIMON_ROW_TYPE,
flussPredicate).get();
+
assertThat(convertedPaimonPredicate.toString()).isEqualTo(expectedPredicate.toString());
+ }
+}