This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 7eefe4ab5 [flink] Apply partition filter to lake in flink source 
(#1549)
7eefe4ab5 is described below

commit 7eefe4ab58d4040ddcf3d6aef24910b358b5c54f
Author: SeungMin <[email protected]>
AuthorDate: Wed Aug 20 12:26:37 2025 +0900

    [flink] Apply partition filter to lake in flink source (#1549)
---
 .../fluss/flink/lake/LakeSplitGenerator.java       | 18 +++++++--
 .../fluss/flink/source/FlinkTableSource.java       | 45 ++++++++++++++++++---
 .../source/enumerator/FlinkSourceEnumerator.java   |  9 +++--
 .../utils/FlussToPaimonPredicateConverter.java     | 47 +++++++++++++++++-----
 .../fluss/lake/paimon/utils/PaimonConversions.java | 13 ++++++
 .../paimon/flink/FlinkUnionReadLogTableITCase.java | 26 ++++++++++++
 .../lake/paimon/source/PaimonLakeSourceTest.java   |  4 +-
 .../utils/FlussToPaimonPredicateConverterTest.java | 45 ++++++++++++++-------
 8 files changed, 167 insertions(+), 40 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
index 1d8198a1f..4a8d47952 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
@@ -38,10 +38,13 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static 
com.alibaba.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
 
 /** A generator for lake splits. */
 public class LakeSplitGenerator {
@@ -51,6 +54,7 @@ public class LakeSplitGenerator {
     private final OffsetsInitializer.BucketOffsetsRetriever 
bucketOffsetsRetriever;
     private final OffsetsInitializer stoppingOffsetInitializer;
     private final int bucketCount;
+    private final Supplier<Set<PartitionInfo>> listPartitionSupplier;
 
     private final LakeSource<LakeSplit> lakeSource;
 
@@ -60,13 +64,15 @@ public class LakeSplitGenerator {
             LakeSource<LakeSplit> lakeSource,
             OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
             OffsetsInitializer stoppingOffsetInitializer,
-            int bucketCount) {
+            int bucketCount,
+            Supplier<Set<PartitionInfo>> listPartitionSupplier) {
         this.tableInfo = tableInfo;
         this.flussAdmin = flussAdmin;
         this.lakeSource = lakeSource;
         this.bucketOffsetsRetriever = bucketOffsetsRetriever;
         this.stoppingOffsetInitializer = stoppingOffsetInitializer;
         this.bucketCount = bucketCount;
+        this.listPartitionSupplier = listPartitionSupplier;
     }
 
     public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
@@ -83,9 +89,13 @@ public class LakeSplitGenerator {
                                 .createPlanner(
                                         (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
                                 .plan());
+
+        if (lakeSplits.isEmpty()) {
+            return Collections.emptyList();
+        }
+
         if (isPartitioned) {
-            List<PartitionInfo> partitionInfos =
-                    
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+            Set<PartitionInfo> partitionInfos = listPartitionSupplier.get();
             Map<Long, String> partitionNameById =
                     partitionInfos.stream()
                             .collect(
@@ -109,7 +119,7 @@ public class LakeSplitGenerator {
     private Map<String, Map<Integer, List<LakeSplit>>> 
groupLakeSplits(List<LakeSplit> lakeSplits) {
         Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
         for (LakeSplit split : lakeSplits) {
-            String partition = String.join("$", split.partition());
+            String partition = String.join(PARTITION_SPEC_SEPARATOR, 
split.partition());
             int bucket = split.bucket();
             // Get or create the partition group
             Map<Integer, List<LakeSplit>> bucketMap =
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
index deb986a50..15ee21c72 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
@@ -32,6 +32,8 @@ import com.alibaba.fluss.lake.source.LakeSource;
 import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.MergeEngineType;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.predicate.Predicate;
+import com.alibaba.fluss.predicate.PredicateBuilder;
 import com.alibaba.fluss.types.RowType;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -66,6 +68,8 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -80,6 +84,7 @@ import java.util.Map;
 
 import static com.alibaba.fluss.flink.utils.LakeSourceUtils.createLakeSource;
 import static 
com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
+import static 
com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
 import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals;
 import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
 
@@ -93,6 +98,8 @@ public class FlinkTableSource
                 SupportsLimitPushDown,
                 SupportsAggregatePushDown {
 
+    public static final Logger LOG = 
LoggerFactory.getLogger(FlinkTableSource.class);
+
     private final TablePath tablePath;
     private final Configuration flussConfig;
     // output type before projection pushdown
@@ -404,9 +411,6 @@ public class FlinkTableSource
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
-        if (lakeSource != null) {
-            // todo: use real filters
-        }
 
         List<ResolvedExpression> acceptedFilters = new ArrayList<>();
         List<ResolvedExpression> remainingFilters = new ArrayList<>();
@@ -449,11 +453,40 @@ public class FlinkTableSource
                             getPartitionKeyTypes(),
                             acceptedFilters,
                             remainingFilters,
-                            FLINK_INTERNAL_VALUE);
+                            FLUSS_INTERNAL_VALUE);
+
             // partitions are filtered by string representations, convert the 
equals to string first
-            fieldEquals = stringifyFieldEquals(fieldEquals);
+            partitionFilters = stringifyFieldEquals(fieldEquals);
+
+            // lake source is not null
+            if (lakeSource != null) {
+                // and exist field equals, push down to lake source
+                if (!fieldEquals.isEmpty()) {
+                    // convert flink row type to fluss row type
+                    RowType flussRowType = 
FlinkConversions.toFlussRowType(tableOutputType);
+
+                    List<Predicate> lakePredicates = new ArrayList<>();
+                    PredicateBuilder predicateBuilder = new 
PredicateBuilder(flussRowType);
+
+                    for (FieldEqual fieldEqual : fieldEquals) {
+                        lakePredicates.add(
+                                predicateBuilder.equal(
+                                        fieldEqual.fieldIndex, 
fieldEqual.equalValue));
+                    }
 
-            this.partitionFilters = fieldEquals;
+                    if (!lakePredicates.isEmpty()) {
+                        final LakeSource.FilterPushDownResult 
filterPushDownResult =
+                                lakeSource.withFilters(lakePredicates);
+                        if (filterPushDownResult.acceptedPredicates().size()
+                                != lakePredicates.size()) {
+                            LOG.info(
+                                    "LakeSource rejected some partition 
filters. Falling back to Flink-side filtering.");
+                            // Flink will apply all filters to preserve 
correctness
+                            return Result.of(Collections.emptyList(), filters);
+                        }
+                    }
+                }
+            }
             return Result.of(acceptedFilters, remainingFilters);
         } else {
             return Result.of(Collections.emptyList(), filters);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 05ac7fa6e..3d29cbbf1 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -62,6 +62,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -296,7 +297,7 @@ public class FlinkSourceEnumerator
         try {
             List<PartitionInfo> partitionInfos = 
flussAdmin.listPartitionInfos(tablePath).get();
             partitionInfos = applyPartitionFilter(partitionInfos);
-            return new HashSet<>(partitionInfos);
+            return new LinkedHashSet<>(partitionInfos);
         } catch (Exception e) {
             throw new FlinkRuntimeException(
                     String.format("Failed to list partitions for %s", 
tablePath),
@@ -519,9 +520,9 @@ public class FlinkSourceEnumerator
                         lakeSource,
                         bucketOffsetsRetriever,
                         stoppingOffsetsInitializer,
-                        tableInfo.getNumBuckets());
-        List<SourceSplitBase> lakeSplits = 
lakeSplitGenerator.generateHybridLakeSplits();
-        return lakeSplits;
+                        tableInfo.getNumBuckets(),
+                        this::listPartitions);
+        return lakeSplitGenerator.generateHybridLakeSplits();
     }
 
     private boolean ignoreTableBucket(TableBucket tableBucket) {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
index e2f28b302..47b31a57e 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
@@ -32,6 +32,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimonLiteral;
+
 /**
  * Converts a Fluss {@link com.alibaba.fluss.predicate.Predicate} into a 
Paimon {@link Predicate}.
  *
@@ -43,9 +45,11 @@ public class FlussToPaimonPredicateConverter implements 
PredicateVisitor<Predica
 
     private final PredicateBuilder builder;
     private final LeafFunctionConverter converter = new 
LeafFunctionConverter();
+    private final RowType paimonRowType;
 
     public FlussToPaimonPredicateConverter(RowType rowType) {
         this.builder = new PredicateBuilder(rowType);
+        this.paimonRowType = rowType;
     }
 
     public static Optional<Predicate> convert(
@@ -97,57 +101,74 @@ public class FlussToPaimonPredicateConverter implements 
PredicateVisitor<Predica
 
         @Override
         public Predicate visitStartsWith(FieldRef fieldRef, Object literal) {
-            return builder.startsWith(fieldRef.index(), literal);
+            return builder.startsWith(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitEndsWith(FieldRef fieldRef, Object literal) {
-            return builder.endsWith(fieldRef.index(), literal);
+            return builder.endsWith(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitContains(FieldRef fieldRef, Object literal) {
-            return builder.contains(fieldRef.index(), literal);
+            return builder.contains(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitLessThan(FieldRef fieldRef, Object literal) {
-            return builder.lessThan(fieldRef.index(), literal);
+            return builder.lessThan(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitGreaterOrEqual(FieldRef fieldRef, Object 
literal) {
-            return builder.greaterOrEqual(fieldRef.index(), literal);
+            return builder.greaterOrEqual(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitNotEqual(FieldRef fieldRef, Object literal) {
-            return builder.notEqual(fieldRef.index(), literal);
+            return builder.notEqual(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitLessOrEqual(FieldRef fieldRef, Object literal) {
-            return builder.lessOrEqual(fieldRef.index(), literal);
+            return builder.lessOrEqual(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitEqual(FieldRef fieldRef, Object literal) {
-            return builder.equal(fieldRef.index(), literal);
+            return builder.equal(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitGreaterThan(FieldRef fieldRef, Object literal) {
-            return builder.greaterThan(fieldRef.index(), literal);
+            return builder.greaterThan(
+                    fieldRef.index(), convertToPaimonLiteral(fieldRef.index(), 
literal));
         }
 
         @Override
         public Predicate visitIn(FieldRef fieldRef, List<Object> literals) {
-            return builder.in(fieldRef.index(), literals);
+            return builder.in(
+                    fieldRef.index(),
+                    literals.stream()
+                            .map(literal -> 
convertToPaimonLiteral(fieldRef.index(), literal))
+                            .collect(Collectors.toList()));
         }
 
         @Override
         public Predicate visitNotIn(FieldRef fieldRef, List<Object> literals) {
-            return builder.notIn(fieldRef.index(), literals);
+            return builder.notIn(
+                    fieldRef.index(),
+                    literals.stream()
+                            .map(literal -> 
convertToPaimonLiteral(fieldRef.index(), literal))
+                            .collect(Collectors.toList()));
         }
 
         @Override
@@ -161,5 +182,9 @@ public class FlussToPaimonPredicateConverter implements 
PredicateVisitor<Predica
             // shouldn't come to here
             throw new UnsupportedOperationException("Unsupported visitOr 
method.");
         }
+
+        private Object convertToPaimonLiteral(int fieldIndex, Object 
flussLiteral) {
+            return toPaimonLiteral(paimonRowType.getTypeAt(fieldIndex), 
flussLiteral);
+        }
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
index 9d51e787b..071704565 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
@@ -17,15 +17,20 @@
 
 package com.alibaba.fluss.lake.paimon.utils;
 
+import com.alibaba.fluss.lake.paimon.source.FlussRowAsPaimonRow;
 import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
@@ -93,4 +98,12 @@ public class PaimonConversions {
         writer.complete();
         return partitionBinaryRow;
     }
+
+    public static Object toPaimonLiteral(DataType dataType, Object 
flussLiteral) {
+        RowType rowType = RowType.of(dataType);
+        InternalRow flussRow = GenericRow.of(flussLiteral);
+        FlussRowAsPaimonRow flussRowAsPaimonRow = new 
FlussRowAsPaimonRow(flussRow, rowType);
+        return org.apache.paimon.data.InternalRow.createFieldGetter(dataType, 
0)
+                .getFieldOrNull(flussRowAsPaimonRow);
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 290a51d10..01e8db19f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -95,11 +95,37 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
                         .map(row -> Row.of(row.getField(1)))
                         .collect(Collectors.toList());
         assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+        if (isPartitioned) {
+            // get first partition
+            String partition = 
waitUntilPartitions(t1).values().iterator().next();
+            String sqlWithPartitionFilter =
+                    "select * FROM " + tableName + " WHERE p = '" + partition 
+ "'";
+
+            String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
+
+            // check if the plan contains partition filter
+            assertThat(plan)
+                    .contains("TableSourceScan(")
+                    .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
+
+            List<Row> expectedFiltered =
+                    writtenRows.stream()
+                            .filter(r -> partition.equals(r.getField(15)))
+                            .collect(Collectors.toList());
+
+            List<Row> actualFiltered =
+                    CollectionUtil.iteratorToList(
+                            
batchTEnv.executeSql(sqlWithPartitionFilter).collect());
+
+            
assertThat(actualFiltered).containsExactlyInAnyOrderElementsOf(expectedFiltered);
+        }
     }
 
     private long prepareLogTable(
             TablePath tablePath, int bucketNum, boolean isPartitioned, 
List<Row> flinkRows)
             throws Exception {
+        // createFullTypeLogTable creates a datalake-enabled table with a 
partition column.
         long t1Id = createFullTypeLogTable(tablePath, bucketNum, 
isPartitioned);
         if (isPartitioned) {
             Map<Long, String> partitionNameById = 
waitUntilPartitions(tablePath);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
index 6b2c7ed56..bb2808107 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
@@ -107,7 +107,9 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
         // test all filter can be accepted
         Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
         Predicate filter2 = FLUSS_BUILDER.lessOrEqual(0, 3);
-        List<Predicate> allFilters = Arrays.asList(filter1, filter2);
+        Predicate filter3 =
+                FLUSS_BUILDER.startsWith(1, 
com.alibaba.fluss.row.BinaryString.fromString("name"));
+        List<Predicate> allFilters = Arrays.asList(filter1, filter2, filter3);
 
         LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
         LakeSource.FilterPushDownResult filterPushDownResult = 
lakeSource.withFilters(allFilters);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
index 5afec4177..cf6f40750 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
@@ -18,6 +18,7 @@ package com.alibaba.fluss.lake.paimon.utils;
 
 import com.alibaba.fluss.predicate.Predicate;
 import com.alibaba.fluss.predicate.PredicateBuilder;
+import com.alibaba.fluss.row.BinaryString;
 import com.alibaba.fluss.types.DataTypes;
 import com.alibaba.fluss.types.RowType;
 
@@ -26,8 +27,10 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static com.alibaba.fluss.row.BinaryString.fromString;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FlussToPaimonPredicateConverter}. */
@@ -56,7 +59,9 @@ class FlussToPaimonPredicateConverterTest {
         return Stream.of(
                 // Leaf Predicates
                 Arguments.of(FLUSS_BUILDER.equal(0, 12L), 
PAIMON_BUILDER.equal(0, 12L)),
-                Arguments.of(FLUSS_BUILDER.notEqual(2, "test"), 
PAIMON_BUILDER.notEqual(2, "test")),
+                Arguments.of(
+                        FLUSS_BUILDER.notEqual(2, fromString("test")),
+                        PAIMON_BUILDER.notEqual(2, "test")),
                 Arguments.of(
                         FLUSS_BUILDER.greaterThan(1, 99.9d), 
PAIMON_BUILDER.greaterThan(1, 99.9d)),
                 Arguments.of(
@@ -67,15 +72,21 @@ class FlussToPaimonPredicateConverterTest {
                 Arguments.of(FLUSS_BUILDER.isNull(2), 
PAIMON_BUILDER.isNull(2)),
                 Arguments.of(FLUSS_BUILDER.isNotNull(1), 
PAIMON_BUILDER.isNotNull(1)),
                 Arguments.of(
-                        FLUSS_BUILDER.in(2, Arrays.asList("a", "b", "c")),
+                        FLUSS_BUILDER.in(
+                                2,
+                                Stream.of("a", "b", "c")
+                                        .map(BinaryString::fromString)
+                                        .collect(Collectors.toList())),
                         PAIMON_BUILDER.in(2, Arrays.asList("a", "b", "c"))),
                 Arguments.of(
                         FLUSS_BUILDER.in(
                                 2,
-                                Arrays.asList(
-                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
-                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
-                                        "c")),
+                                Stream.of(
+                                                "a", "b", "c", "a", "b", "c", 
"a", "b", "c", "a",
+                                                "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                                "c", "a", "b", "c")
+                                        .map(BinaryString::fromString)
+                                        .collect(Collectors.toList())),
                         PAIMON_BUILDER.in(
                                 2,
                                 Arrays.asList(
@@ -85,10 +96,12 @@ class FlussToPaimonPredicateConverterTest {
                 Arguments.of(
                         FLUSS_BUILDER.notIn(
                                 2,
-                                Arrays.asList(
-                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
-                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
-                                        "c")),
+                                Stream.of(
+                                                "a", "b", "c", "a", "b", "c", 
"a", "b", "c", "a",
+                                                "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                                "c", "a", "b", "c")
+                                        .map(BinaryString::fromString)
+                                        .collect(Collectors.toList())),
                         PAIMON_BUILDER.notIn(
                                 2,
                                 Arrays.asList(
@@ -96,10 +109,14 @@ class FlussToPaimonPredicateConverterTest {
                                         "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
                                         "c"))),
                 Arguments.of(
-                        FLUSS_BUILDER.startsWith(2, "start"),
+                        FLUSS_BUILDER.startsWith(2, fromString("start")),
                         PAIMON_BUILDER.startsWith(2, "start")),
-                Arguments.of(FLUSS_BUILDER.endsWith(2, "end"), 
PAIMON_BUILDER.endsWith(2, "end")),
-                Arguments.of(FLUSS_BUILDER.contains(2, "mid"), 
PAIMON_BUILDER.contains(2, "mid")),
+                Arguments.of(
+                        FLUSS_BUILDER.endsWith(2, fromString("end")),
+                        PAIMON_BUILDER.endsWith(2, "end")),
+                Arguments.of(
+                        FLUSS_BUILDER.contains(2, fromString("mid")),
+                        PAIMON_BUILDER.contains(2, "mid")),
 
                 // Compound Predicates
                 Arguments.of(
@@ -118,7 +135,7 @@ class FlussToPaimonPredicateConverterTest {
                 // Nested Predicate
                 Arguments.of(
                         PredicateBuilder.and(
-                                FLUSS_BUILDER.equal(2, "test"),
+                                FLUSS_BUILDER.equal(2, fromString("test")),
                                 PredicateBuilder.or(
                                         FLUSS_BUILDER.equal(0, 1L),
                                         FLUSS_BUILDER.greaterThan(1, 50.0))),

Reply via email to