This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 7eefe4ab5 [flink] Apply partition filter to lake in flink source
(#1549)
7eefe4ab5 is described below
commit 7eefe4ab58d4040ddcf3d6aef24910b358b5c54f
Author: SeungMin <[email protected]>
AuthorDate: Wed Aug 20 12:26:37 2025 +0900
[flink] Apply partition filter to lake in flink source (#1549)
---
.../fluss/flink/lake/LakeSplitGenerator.java | 18 +++++++--
.../fluss/flink/source/FlinkTableSource.java | 45 ++++++++++++++++++---
.../source/enumerator/FlinkSourceEnumerator.java | 9 +++--
.../utils/FlussToPaimonPredicateConverter.java | 47 +++++++++++++++++-----
.../fluss/lake/paimon/utils/PaimonConversions.java | 13 ++++++
.../paimon/flink/FlinkUnionReadLogTableITCase.java | 26 ++++++++++++
.../lake/paimon/source/PaimonLakeSourceTest.java | 4 +-
.../utils/FlussToPaimonPredicateConverterTest.java | 45 ++++++++++++++-------
8 files changed, 167 insertions(+), 40 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
index 1d8198a1f..4a8d47952 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
@@ -38,10 +38,13 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static
com.alibaba.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
/** A generator for lake splits. */
public class LakeSplitGenerator {
@@ -51,6 +54,7 @@ public class LakeSplitGenerator {
private final OffsetsInitializer.BucketOffsetsRetriever
bucketOffsetsRetriever;
private final OffsetsInitializer stoppingOffsetInitializer;
private final int bucketCount;
+ private final Supplier<Set<PartitionInfo>> listPartitionSupplier;
private final LakeSource<LakeSplit> lakeSource;
@@ -60,13 +64,15 @@ public class LakeSplitGenerator {
LakeSource<LakeSplit> lakeSource,
OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
OffsetsInitializer stoppingOffsetInitializer,
- int bucketCount) {
+ int bucketCount,
+ Supplier<Set<PartitionInfo>> listPartitionSupplier) {
this.tableInfo = tableInfo;
this.flussAdmin = flussAdmin;
this.lakeSource = lakeSource;
this.bucketOffsetsRetriever = bucketOffsetsRetriever;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
this.bucketCount = bucketCount;
+ this.listPartitionSupplier = listPartitionSupplier;
}
public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
@@ -83,9 +89,13 @@ public class LakeSplitGenerator {
.createPlanner(
(LakeSource.PlannerContext)
lakeSnapshotInfo::getSnapshotId)
.plan());
+
+ if (lakeSplits.isEmpty()) {
+ return Collections.emptyList();
+ }
+
if (isPartitioned) {
- List<PartitionInfo> partitionInfos =
-
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+ Set<PartitionInfo> partitionInfos = listPartitionSupplier.get();
Map<Long, String> partitionNameById =
partitionInfos.stream()
.collect(
@@ -109,7 +119,7 @@ public class LakeSplitGenerator {
private Map<String, Map<Integer, List<LakeSplit>>>
groupLakeSplits(List<LakeSplit> lakeSplits) {
Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
for (LakeSplit split : lakeSplits) {
- String partition = String.join("$", split.partition());
+ String partition = String.join(PARTITION_SPEC_SEPARATOR,
split.partition());
int bucket = split.bucket();
// Get or create the partition group
Map<Integer, List<LakeSplit>> bucketMap =
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
index deb986a50..15ee21c72 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
@@ -32,6 +32,8 @@ import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.MergeEngineType;
import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.predicate.Predicate;
+import com.alibaba.fluss.predicate.PredicateBuilder;
import com.alibaba.fluss.types.RowType;
import org.apache.flink.annotation.VisibleForTesting;
@@ -66,6 +68,8 @@ import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -80,6 +84,7 @@ import java.util.Map;
import static com.alibaba.fluss.flink.utils.LakeSourceUtils.createLakeSource;
import static
com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
+import static
com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
@@ -93,6 +98,8 @@ public class FlinkTableSource
SupportsLimitPushDown,
SupportsAggregatePushDown {
+ public static final Logger LOG =
LoggerFactory.getLogger(FlinkTableSource.class);
+
private final TablePath tablePath;
private final Configuration flussConfig;
// output type before projection pushdown
@@ -404,9 +411,6 @@ public class FlinkTableSource
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
- if (lakeSource != null) {
- // todo: use real filters
- }
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
List<ResolvedExpression> remainingFilters = new ArrayList<>();
@@ -449,11 +453,40 @@ public class FlinkTableSource
getPartitionKeyTypes(),
acceptedFilters,
remainingFilters,
- FLINK_INTERNAL_VALUE);
+ FLUSS_INTERNAL_VALUE);
+
// partitions are filtered by string representations, convert the
equals to string first
- fieldEquals = stringifyFieldEquals(fieldEquals);
+ partitionFilters = stringifyFieldEquals(fieldEquals);
+
+ // lake source is not null
+ if (lakeSource != null) {
+ // and exist field equals, push down to lake source
+ if (!fieldEquals.isEmpty()) {
+ // convert flink row type to fluss row type
+ RowType flussRowType =
FlinkConversions.toFlussRowType(tableOutputType);
+
+ List<Predicate> lakePredicates = new ArrayList<>();
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(flussRowType);
+
+ for (FieldEqual fieldEqual : fieldEquals) {
+ lakePredicates.add(
+ predicateBuilder.equal(
+ fieldEqual.fieldIndex,
fieldEqual.equalValue));
+ }
- this.partitionFilters = fieldEquals;
+ if (!lakePredicates.isEmpty()) {
+ final LakeSource.FilterPushDownResult
filterPushDownResult =
+ lakeSource.withFilters(lakePredicates);
+ if (filterPushDownResult.acceptedPredicates().size()
+ != lakePredicates.size()) {
+ LOG.info(
+ "LakeSource rejected some partition
filters. Falling back to Flink-side filtering.");
+ // Flink will apply all filters to preserve
correctness
+ return Result.of(Collections.emptyList(), filters);
+ }
+ }
+ }
+ }
return Result.of(acceptedFilters, remainingFilters);
} else {
return Result.of(Collections.emptyList(), filters);
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 05ac7fa6e..3d29cbbf1 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -62,6 +62,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -296,7 +297,7 @@ public class FlinkSourceEnumerator
try {
List<PartitionInfo> partitionInfos =
flussAdmin.listPartitionInfos(tablePath).get();
partitionInfos = applyPartitionFilter(partitionInfos);
- return new HashSet<>(partitionInfos);
+ return new LinkedHashSet<>(partitionInfos);
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Failed to list partitions for %s",
tablePath),
@@ -519,9 +520,9 @@ public class FlinkSourceEnumerator
lakeSource,
bucketOffsetsRetriever,
stoppingOffsetsInitializer,
- tableInfo.getNumBuckets());
- List<SourceSplitBase> lakeSplits =
lakeSplitGenerator.generateHybridLakeSplits();
- return lakeSplits;
+ tableInfo.getNumBuckets(),
+ this::listPartitions);
+ return lakeSplitGenerator.generateHybridLakeSplits();
}
private boolean ignoreTableBucket(TableBucket tableBucket) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
index e2f28b302..47b31a57e 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverter.java
@@ -32,6 +32,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimonLiteral;
+
/**
* Converts a Fluss {@link com.alibaba.fluss.predicate.Predicate} into a
Paimon {@link Predicate}.
*
@@ -43,9 +45,11 @@ public class FlussToPaimonPredicateConverter implements
PredicateVisitor<Predica
private final PredicateBuilder builder;
private final LeafFunctionConverter converter = new
LeafFunctionConverter();
+ private final RowType paimonRowType;
public FlussToPaimonPredicateConverter(RowType rowType) {
this.builder = new PredicateBuilder(rowType);
+ this.paimonRowType = rowType;
}
public static Optional<Predicate> convert(
@@ -97,57 +101,74 @@ public class FlussToPaimonPredicateConverter implements
PredicateVisitor<Predica
@Override
public Predicate visitStartsWith(FieldRef fieldRef, Object literal) {
- return builder.startsWith(fieldRef.index(), literal);
+ return builder.startsWith(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitEndsWith(FieldRef fieldRef, Object literal) {
- return builder.endsWith(fieldRef.index(), literal);
+ return builder.endsWith(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitContains(FieldRef fieldRef, Object literal) {
- return builder.contains(fieldRef.index(), literal);
+ return builder.contains(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitLessThan(FieldRef fieldRef, Object literal) {
- return builder.lessThan(fieldRef.index(), literal);
+ return builder.lessThan(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
- return builder.greaterOrEqual(fieldRef.index(), literal);
+ return builder.greaterOrEqual(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitNotEqual(FieldRef fieldRef, Object literal) {
- return builder.notEqual(fieldRef.index(), literal);
+ return builder.notEqual(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitLessOrEqual(FieldRef fieldRef, Object literal) {
- return builder.lessOrEqual(fieldRef.index(), literal);
+ return builder.lessOrEqual(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitEqual(FieldRef fieldRef, Object literal) {
- return builder.equal(fieldRef.index(), literal);
+ return builder.equal(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitGreaterThan(FieldRef fieldRef, Object literal) {
- return builder.greaterThan(fieldRef.index(), literal);
+ return builder.greaterThan(
+ fieldRef.index(), convertToPaimonLiteral(fieldRef.index(),
literal));
}
@Override
public Predicate visitIn(FieldRef fieldRef, List<Object> literals) {
- return builder.in(fieldRef.index(), literals);
+ return builder.in(
+ fieldRef.index(),
+ literals.stream()
+ .map(literal ->
convertToPaimonLiteral(fieldRef.index(), literal))
+ .collect(Collectors.toList()));
}
@Override
public Predicate visitNotIn(FieldRef fieldRef, List<Object> literals) {
- return builder.notIn(fieldRef.index(), literals);
+ return builder.notIn(
+ fieldRef.index(),
+ literals.stream()
+ .map(literal ->
convertToPaimonLiteral(fieldRef.index(), literal))
+ .collect(Collectors.toList()));
}
@Override
@@ -161,5 +182,9 @@ public class FlussToPaimonPredicateConverter implements
PredicateVisitor<Predica
// shouldn't come to here
throw new UnsupportedOperationException("Unsupported visitOr
method.");
}
+
+ private Object convertToPaimonLiteral(int fieldIndex, Object
flussLiteral) {
+ return toPaimonLiteral(paimonRowType.getTypeAt(fieldIndex),
flussLiteral);
+ }
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
index 9d51e787b..071704565 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
@@ -17,15 +17,20 @@
package com.alibaba.fluss.lake.paimon.utils;
+import com.alibaba.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -93,4 +98,12 @@ public class PaimonConversions {
writer.complete();
return partitionBinaryRow;
}
+
+ public static Object toPaimonLiteral(DataType dataType, Object
flussLiteral) {
+ RowType rowType = RowType.of(dataType);
+ InternalRow flussRow = GenericRow.of(flussLiteral);
+ FlussRowAsPaimonRow flussRowAsPaimonRow = new
FlussRowAsPaimonRow(flussRow, rowType);
+ return org.apache.paimon.data.InternalRow.createFieldGetter(dataType,
0)
+ .getFieldOrNull(flussRowAsPaimonRow);
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 290a51d10..01e8db19f 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -95,11 +95,37 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
.map(row -> Row.of(row.getField(1)))
.collect(Collectors.toList());
assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+ if (isPartitioned) {
+ // get first partition
+ String partition =
waitUntilPartitions(t1).values().iterator().next();
+ String sqlWithPartitionFilter =
+ "select * FROM " + tableName + " WHERE p = '" + partition
+ "'";
+
+ String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
+
+ // check if the plan contains partition filter
+ assertThat(plan)
+ .contains("TableSourceScan(")
+ .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
+
+ List<Row> expectedFiltered =
+ writtenRows.stream()
+ .filter(r -> partition.equals(r.getField(15)))
+ .collect(Collectors.toList());
+
+ List<Row> actualFiltered =
+ CollectionUtil.iteratorToList(
+
batchTEnv.executeSql(sqlWithPartitionFilter).collect());
+
+
assertThat(actualFiltered).containsExactlyInAnyOrderElementsOf(expectedFiltered);
+ }
}
private long prepareLogTable(
TablePath tablePath, int bucketNum, boolean isPartitioned,
List<Row> flinkRows)
throws Exception {
+ // createFullTypeLogTable creates a datalake-enabled table with a
partition column.
long t1Id = createFullTypeLogTable(tablePath, bucketNum,
isPartitioned);
if (isPartitioned) {
Map<Long, String> partitionNameById =
waitUntilPartitions(tablePath);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
index 6b2c7ed56..bb2808107 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSourceTest.java
@@ -107,7 +107,9 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
// test all filter can be accepted
Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
Predicate filter2 = FLUSS_BUILDER.lessOrEqual(0, 3);
- List<Predicate> allFilters = Arrays.asList(filter1, filter2);
+ Predicate filter3 =
+ FLUSS_BUILDER.startsWith(1,
com.alibaba.fluss.row.BinaryString.fromString("name"));
+ List<Predicate> allFilters = Arrays.asList(filter1, filter2, filter3);
LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
LakeSource.FilterPushDownResult filterPushDownResult =
lakeSource.withFilters(allFilters);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
index 5afec4177..cf6f40750 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/utils/FlussToPaimonPredicateConverterTest.java
@@ -18,6 +18,7 @@ package com.alibaba.fluss.lake.paimon.utils;
import com.alibaba.fluss.predicate.Predicate;
import com.alibaba.fluss.predicate.PredicateBuilder;
+import com.alibaba.fluss.row.BinaryString;
import com.alibaba.fluss.types.DataTypes;
import com.alibaba.fluss.types.RowType;
@@ -26,8 +27,10 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static com.alibaba.fluss.row.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link FlussToPaimonPredicateConverter}. */
@@ -56,7 +59,9 @@ class FlussToPaimonPredicateConverterTest {
return Stream.of(
// Leaf Predicates
Arguments.of(FLUSS_BUILDER.equal(0, 12L),
PAIMON_BUILDER.equal(0, 12L)),
- Arguments.of(FLUSS_BUILDER.notEqual(2, "test"),
PAIMON_BUILDER.notEqual(2, "test")),
+ Arguments.of(
+ FLUSS_BUILDER.notEqual(2, fromString("test")),
+ PAIMON_BUILDER.notEqual(2, "test")),
Arguments.of(
FLUSS_BUILDER.greaterThan(1, 99.9d),
PAIMON_BUILDER.greaterThan(1, 99.9d)),
Arguments.of(
@@ -67,15 +72,21 @@ class FlussToPaimonPredicateConverterTest {
Arguments.of(FLUSS_BUILDER.isNull(2),
PAIMON_BUILDER.isNull(2)),
Arguments.of(FLUSS_BUILDER.isNotNull(1),
PAIMON_BUILDER.isNotNull(1)),
Arguments.of(
- FLUSS_BUILDER.in(2, Arrays.asList("a", "b", "c")),
+ FLUSS_BUILDER.in(
+ 2,
+ Stream.of("a", "b", "c")
+ .map(BinaryString::fromString)
+ .collect(Collectors.toList())),
PAIMON_BUILDER.in(2, Arrays.asList("a", "b", "c"))),
Arguments.of(
FLUSS_BUILDER.in(
2,
- Arrays.asList(
- "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
- "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
- "c")),
+ Stream.of(
+ "a", "b", "c", "a", "b", "c",
"a", "b", "c", "a",
+ "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c", "a", "b", "c")
+ .map(BinaryString::fromString)
+ .collect(Collectors.toList())),
PAIMON_BUILDER.in(
2,
Arrays.asList(
@@ -85,10 +96,12 @@ class FlussToPaimonPredicateConverterTest {
Arguments.of(
FLUSS_BUILDER.notIn(
2,
- Arrays.asList(
- "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
- "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
- "c")),
+ Stream.of(
+ "a", "b", "c", "a", "b", "c",
"a", "b", "c", "a",
+ "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c", "a", "b", "c")
+ .map(BinaryString::fromString)
+ .collect(Collectors.toList())),
PAIMON_BUILDER.notIn(
2,
Arrays.asList(
@@ -96,10 +109,14 @@ class FlussToPaimonPredicateConverterTest {
"a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
"c"))),
Arguments.of(
- FLUSS_BUILDER.startsWith(2, "start"),
+ FLUSS_BUILDER.startsWith(2, fromString("start")),
PAIMON_BUILDER.startsWith(2, "start")),
- Arguments.of(FLUSS_BUILDER.endsWith(2, "end"),
PAIMON_BUILDER.endsWith(2, "end")),
- Arguments.of(FLUSS_BUILDER.contains(2, "mid"),
PAIMON_BUILDER.contains(2, "mid")),
+ Arguments.of(
+ FLUSS_BUILDER.endsWith(2, fromString("end")),
+ PAIMON_BUILDER.endsWith(2, "end")),
+ Arguments.of(
+ FLUSS_BUILDER.contains(2, fromString("mid")),
+ PAIMON_BUILDER.contains(2, "mid")),
// Compound Predicates
Arguments.of(
@@ -118,7 +135,7 @@ class FlussToPaimonPredicateConverterTest {
// Nested Predicate
Arguments.of(
PredicateBuilder.and(
- FLUSS_BUILDER.equal(2, "test"),
+ FLUSS_BUILDER.equal(2, fromString("test")),
PredicateBuilder.or(
FLUSS_BUILDER.equal(0, 1L),
FLUSS_BUILDER.greaterThan(1, 50.0))),