This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9f0fbf527 [lake/iceberg] Iceberg read implementation supports filter
push down (#1715)
9f0fbf527 is described below
commit 9f0fbf527801e75897dfc2b55f7b9c545fdc3f62
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Sep 19 10:19:59 2025 +0800
[lake/iceberg] Iceberg read implementation supports filter push down (#1715)
---
.../iceberg/source/FlussRowAsIcebergRecord.java | 182 ++++++++++++++++++
.../lake/iceberg/source/IcebergLakeSource.java | 35 +++-
.../lake/iceberg/source/IcebergRecordReader.java | 8 +-
.../lake/iceberg/source/IcebergSplitPlanner.java | 20 +-
.../tiering/FlussRecordAsIcebergRecord.java | 164 +++-------------
.../fluss/lake/iceberg/tiering/RecordWriter.java | 2 +-
.../utils/FlussToIcebergPredicateConverter.java | 206 +++++++++++++++++++++
.../lake/iceberg/utils/IcebergConversions.java | 48 +++++
.../flink/FlinkUnionReadLogTableITCase.java | 5 +-
.../iceberg/source/IcebergLakeSourceTest.java} | 129 +++++--------
.../FlussToIcebergPredicateConverterTest.java | 171 +++++++++++++++++
.../lake/iceberg/utils/IcebergConversionsTest.java | 1 -
.../fluss/lake/paimon/source/PaimonLakeSource.java | 3 +-
.../lake/paimon/source/PaimonLakeSourceTest.java | 2 +-
14 files changed, 739 insertions(+), 237 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
new file mode 100644
index 000000000..805bddcb6
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.DateTimeUtils;
+
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Map;
+
+/** Wrap Fluss {@link InternalRow} as Iceberg {@link Record}. */
+public class FlussRowAsIcebergRecord implements Record {
+
+ protected InternalRow internalRow;
+ protected final Types.StructType structType;
+ protected final RowType flussRowType;
+ private final FlussRowToIcebergFieldConverter[] fieldConverters;
+
+ public FlussRowAsIcebergRecord(Types.StructType structType, RowType
flussRowType) {
+ this.structType = structType;
+ this.flussRowType = flussRowType;
+ fieldConverters = new
FlussRowToIcebergFieldConverter[flussRowType.getFieldCount()];
+ for (int pos = 0; pos < flussRowType.getFieldCount(); pos++) {
+ DataType flussType = flussRowType.getTypeAt(pos);
+ fieldConverters[pos] = createTypeConverter(flussType, pos);
+ }
+ }
+
+ public FlussRowAsIcebergRecord(
+ Types.StructType structType, RowType flussRowType, InternalRow
internalRow) {
+ this(structType, flussRowType);
+ this.internalRow = internalRow;
+ }
+
+ @Override
+ public Types.StructType struct() {
+ return structType;
+ }
+
+ @Override
+ public Object getField(String name) {
+ return get(structType.fields().indexOf(structType.field(name)));
+ }
+
+ @Override
+ public void setField(String name, Object value) {
+ throw new UnsupportedOperationException("method setField is not
supported.");
+ }
+
+ @Override
+ public Object get(int pos) {
+ // handle normal columns
+ if (internalRow.isNullAt(pos)) {
+ return null;
+ }
+ return fieldConverters[pos].convert(internalRow);
+ }
+
+ @Override
+ public Record copy() {
+ throw new UnsupportedOperationException("method copy is not
supported.");
+ }
+
+ @Override
+ public Record copy(Map<String, Object> overwriteValues) {
+ throw new UnsupportedOperationException("method copy is not
supported.");
+ }
+
+ @Override
+ public int size() {
+ return structType.fields().size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ Object value = get(pos);
+ if (value == null || javaClass.isInstance(value)) {
+ return javaClass.cast(value);
+ } else {
+ throw new IllegalStateException(
+ "Not an instance of " + javaClass.getName() + ": " +
value);
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("method set is not
supported.");
+ }
+
+ private interface FlussRowToIcebergFieldConverter {
+ Object convert(InternalRow value);
+ }
+
+ private FlussRowToIcebergFieldConverter createTypeConverter(DataType
flussType, int pos) {
+ if (flussType instanceof BooleanType) {
+ return row -> row.getBoolean(pos);
+ } else if (flussType instanceof TinyIntType) {
+ return row -> (int) row.getByte(pos);
+ } else if (flussType instanceof SmallIntType) {
+ return row -> (int) row.getShort(pos);
+ } else if (flussType instanceof IntType) {
+ return row -> row.getInt(pos);
+ } else if (flussType instanceof BigIntType) {
+ return row -> row.getLong(pos);
+ } else if (flussType instanceof FloatType) {
+ return row -> row.getFloat(pos);
+ } else if (flussType instanceof DoubleType) {
+ return row -> row.getDouble(pos);
+ } else if (flussType instanceof StringType) {
+ return row -> row.getString(pos).toString();
+ } else if (flussType instanceof CharType) {
+ CharType charType = (CharType) flussType;
+ return row -> row.getChar(pos, charType.getLength()).toString();
+ } else if (flussType instanceof BytesType || flussType instanceof
BinaryType) {
+ return row -> ByteBuffer.wrap(row.getBytes(pos));
+ } else if (flussType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) flussType;
+ return row ->
+ row.getDecimal(pos, decimalType.getPrecision(),
decimalType.getScale())
+ .toBigDecimal();
+ } else if (flussType instanceof LocalZonedTimestampType) {
+ LocalZonedTimestampType ltzType = (LocalZonedTimestampType)
flussType;
+ return row ->
+ toIcebergTimestampLtz(
+ row.getTimestampLtz(pos,
ltzType.getPrecision()).toInstant());
+ } else if (flussType instanceof TimestampType) {
+ TimestampType tsType = (TimestampType) flussType;
+ return row -> row.getTimestampNtz(pos,
tsType.getPrecision()).toLocalDateTime();
+ } else if (flussType instanceof DateType) {
+ return row -> DateTimeUtils.toLocalDate(row.getInt(pos));
+ } else if (flussType instanceof TimeType) {
+ return row -> DateTimeUtils.toLocalTime(row.getInt(pos));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported data type conversion for Fluss type: "
+ + flussType.getClass().getSimpleName());
+ }
+ }
+
+ private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
+ return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
index 31eee6fb5..d40a8ad39 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
@@ -19,6 +19,7 @@
package org.apache.fluss.lake.iceberg.source;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.utils.FlussToIcebergPredicateConverter;
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.lake.source.LakeSource;
@@ -27,14 +28,18 @@ import org.apache.fluss.lake.source.RecordReader;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
@@ -44,6 +49,7 @@ public class IcebergLakeSource implements
LakeSource<IcebergSplit> {
private final Configuration icebergConfig;
private final TablePath tablePath;
private @Nullable int[][] project;
+ private @Nullable Expression filter;
public IcebergLakeSource(Configuration icebergConfig, TablePath tablePath)
{
this.icebergConfig = icebergConfig;
@@ -62,13 +68,29 @@ public class IcebergLakeSource implements
LakeSource<IcebergSplit> {
@Override
public FilterPushDownResult withFilters(List<Predicate> predicates) {
- // TODO: Support filter push down. #1676
- return FilterPushDownResult.of(Collections.emptyList(), predicates);
+ List<Predicate> unConsumedPredicates = new ArrayList<>();
+ List<Predicate> consumedPredicates = new ArrayList<>();
+ List<Expression> converted = new ArrayList<>();
+ Schema schema = getSchema(tablePath);
+ for (Predicate predicate : predicates) {
+ Optional<Expression> optPredicate =
+ FlussToIcebergPredicateConverter.convert(schema,
predicate);
+ if (optPredicate.isPresent()) {
+ consumedPredicates.add(predicate);
+ converted.add(optPredicate.get());
+ } else {
+ unConsumedPredicates.add(predicate);
+ }
+ }
+ if (!converted.isEmpty()) {
+ filter = converted.stream().reduce(Expressions::and).orElse(null);
+ }
+ return FilterPushDownResult.of(consumedPredicates,
unConsumedPredicates);
}
@Override
public Planner<IcebergSplit> createPlanner(PlannerContext context) throws
IOException {
- return new IcebergSplitPlanner(icebergConfig, tablePath,
context.snapshotId());
+ return new IcebergSplitPlanner(icebergConfig, tablePath,
context.snapshotId(), filter);
}
@Override
@@ -82,4 +104,9 @@ public class IcebergLakeSource implements
LakeSource<IcebergSplit> {
public SimpleVersionedSerializer<IcebergSplit> getSplitSerializer() {
return new IcebergSplitSerializer();
}
+
+ private Schema getSchema(TablePath tablePath) {
+ Catalog catalog =
IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
+ return catalog.loadTable(toIceberg(tablePath)).schema();
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
index d848ec152..f654fb5d0 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
@@ -45,7 +45,13 @@ import java.util.stream.IntStream;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
-/** Iceberg record reader. */
+/**
+ * Iceberg record reader. The filter is applied during the plan phase of
IcebergSplitPlanner, so the
+ * RecordReader does not need to apply the filter again.
+ *
+ * <p>Refer to {@link
org.apache.iceberg.data.GenericReader#open(FileScanTask)} and {@link
+ * org.apache.iceberg.Scan#ignoreResiduals()} for details.
+ */
public class IcebergRecordReader implements RecordReader {
protected IcebergRecordAsFlussRecordIterator iterator;
protected @Nullable int[][] project;
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
index 89a5dc269..4376108ba 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
@@ -27,9 +27,13 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -47,11 +51,14 @@ public class IcebergSplitPlanner implements
Planner<IcebergSplit> {
private final Configuration icebergConfig;
private final TablePath tablePath;
private final long snapshotId;
+ private final @Nullable Expression filter;
- public IcebergSplitPlanner(Configuration icebergConfig, TablePath
tablePath, long snapshotId) {
+ public IcebergSplitPlanner(
+ Configuration icebergConfig, TablePath tablePath, long snapshotId,
Expression filter) {
this.icebergConfig = icebergConfig;
this.tablePath = tablePath;
this.snapshotId = snapshotId;
+ this.filter = filter;
}
@Override
@@ -61,12 +68,11 @@ public class IcebergSplitPlanner implements
Planner<IcebergSplit> {
Table table = catalog.loadTable(toIceberg(tablePath));
Function<FileScanTask, List<String>> partitionExtract =
createPartitionExtractor(table);
Function<FileScanTask, Integer> bucketExtractor =
createBucketExtractor(table);
- try (CloseableIterable<FileScanTask> tasks =
- table.newScan()
- .useSnapshot(snapshotId)
- .includeColumnStats()
- .ignoreResiduals()
- .planFiles()) {
+ TableScan tableScan =
table.newScan().useSnapshot(snapshotId).includeColumnStats();
+ if (filter != null) {
+ tableScan = tableScan.filter(filter);
+ }
+ try (CloseableIterable<FileScanTask> tasks = tableScan.planFiles()) {
tasks.forEach(
task ->
splits.add(
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
index 0748c89c3..6c2296152 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
@@ -17,38 +17,21 @@
package org.apache.fluss.lake.iceberg.tiering;
+import org.apache.fluss.lake.iceberg.source.FlussRowAsIcebergRecord;
import org.apache.fluss.record.LogRecord;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.types.BigIntType;
-import org.apache.fluss.types.BinaryType;
-import org.apache.fluss.types.BooleanType;
-import org.apache.fluss.types.BytesType;
-import org.apache.fluss.types.CharType;
-import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.DateType;
-import org.apache.fluss.types.DecimalType;
-import org.apache.fluss.types.DoubleType;
-import org.apache.fluss.types.FloatType;
-import org.apache.fluss.types.IntType;
-import org.apache.fluss.types.LocalZonedTimestampType;
import org.apache.fluss.types.RowType;
-import org.apache.fluss.types.SmallIntType;
-import org.apache.fluss.types.StringType;
-import org.apache.fluss.types.TimeType;
-import org.apache.fluss.types.TimestampType;
-import org.apache.fluss.types.TinyIntType;
-import org.apache.fluss.utils.DateTimeUtils;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
-import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
-import java.util.Map;
+import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.fluss.utils.Preconditions.checkState;
/**
@@ -57,24 +40,21 @@ import static
org.apache.fluss.utils.Preconditions.checkState;
* <p>todo: refactor to implement ParquetWriters, OrcWriters, AvroWriters just
like Flink & Spark
* write to iceberg for higher performance
*/
-public class FlussRecordAsIcebergRecord implements Record {
+public class FlussRecordAsIcebergRecord extends FlussRowAsIcebergRecord {
// Lake table for iceberg will append three system columns: __bucket,
__offset,__timestamp
- private static final int LAKE_ICEBERG_SYSTEM_COLUMNS = 3;
+ private static final int LAKE_ICEBERG_SYSTEM_COLUMNS =
SYSTEM_COLUMNS.size();
private LogRecord logRecord;
private final int bucket;
- private final Schema icebergSchema;
- private final RowType flussRowType;
// the origin row fields in fluss, excluding the system columns in iceberg
private int originRowFieldCount;
- private InternalRow internalRow;
- public FlussRecordAsIcebergRecord(int bucket, Schema icebergSchema,
RowType flussRowType) {
+ public FlussRecordAsIcebergRecord(
+ int bucket, Types.StructType structType, RowType flussRowType) {
+ super(structType, flussRowType);
this.bucket = bucket;
- this.icebergSchema = icebergSchema;
- this.flussRowType = flussRowType;
}
public void setFlussRecord(LogRecord logRecord) {
@@ -82,24 +62,25 @@ public class FlussRecordAsIcebergRecord implements Record {
this.internalRow = logRecord.getRow();
this.originRowFieldCount = internalRow.getFieldCount();
checkState(
- originRowFieldCount
- == icebergSchema.asStruct().fields().size() -
LAKE_ICEBERG_SYSTEM_COLUMNS,
+ originRowFieldCount == structType.fields().size() -
LAKE_ICEBERG_SYSTEM_COLUMNS,
"The Iceberg table fields count must equals to LogRecord's
fields count.");
}
- @Override
- public Types.StructType struct() {
- return icebergSchema.asStruct();
- }
-
@Override
public Object getField(String name) {
- return icebergSchema;
- }
-
- @Override
- public void setField(String name, Object value) {
- throw new UnsupportedOperationException("method setField is not
supported.");
+ if (SYSTEM_COLUMNS.containsKey(name)) {
+ switch (name) {
+ case BUCKET_COLUMN_NAME:
+ return bucket;
+ case OFFSET_COLUMN_NAME:
+ return logRecord.logOffset();
+ case TIMESTAMP_COLUMN_NAME:
+ return toIcebergTimestampLtz(logRecord.timestamp());
+ default:
+ throw new IllegalArgumentException("Unknown system column:
" + name);
+ }
+ }
+ return super.getField(name);
}
@Override
@@ -113,103 +94,12 @@ public class FlussRecordAsIcebergRecord implements Record
{
return logRecord.logOffset();
} else if (pos == originRowFieldCount + 2) {
// timestamp column
- return getTimestampLtz(logRecord.timestamp());
+ return toIcebergTimestampLtz(logRecord.timestamp());
}
-
- // handle normal columns
- if (internalRow.isNullAt(pos)) {
- return null;
- }
-
- DataType dataType = flussRowType.getTypeAt(pos);
- if (dataType instanceof BooleanType) {
- return internalRow.getBoolean(pos);
- } else if (dataType instanceof TinyIntType) {
- return (int) internalRow.getByte(pos);
- } else if (dataType instanceof SmallIntType) {
- return (int) internalRow.getShort(pos);
- } else if (dataType instanceof IntType) {
- return internalRow.getInt(pos);
- } else if (dataType instanceof BigIntType) {
- return internalRow.getLong(pos);
- } else if (dataType instanceof FloatType) {
- return internalRow.getFloat(pos);
- } else if (dataType instanceof DoubleType) {
- return internalRow.getDouble(pos);
- } else if (dataType instanceof StringType) {
- return internalRow.getString(pos).toString();
- } else if (dataType instanceof CharType) {
- CharType charType = (CharType) dataType;
- return internalRow.getChar(pos, charType.getLength()).toString();
- } else if (dataType instanceof BytesType) {
- return ByteBuffer.wrap(internalRow.getBytes(pos));
- } else if (dataType instanceof BinaryType) {
- // Iceberg's Record interface expects ByteBuffer for binary types.
- return ByteBuffer.wrap(internalRow.getBytes(pos));
- } else if (dataType instanceof DecimalType) {
- // Iceberg expects BigDecimal for decimal types.
- DecimalType decimalType = (DecimalType) dataType;
- return internalRow
- .getDecimal(pos, decimalType.getPrecision(),
decimalType.getScale())
- .toBigDecimal();
- } else if (dataType instanceof LocalZonedTimestampType) {
- // Iceberg expects OffsetDateTime for timestamp with local
timezone.
- return getTimestampLtz(
- internalRow
- .getTimestampLtz(
- pos, ((LocalZonedTimestampType)
dataType).getPrecision())
- .toInstant());
- } else if (dataType instanceof TimestampType) {
- // Iceberg expects LocalDateType for timestamp without local
timezone.
- return internalRow
- .getTimestampNtz(pos, ((TimestampType)
dataType).getPrecision())
- .toLocalDateTime();
- } else if (dataType instanceof DateType) {
- return DateTimeUtils.toLocalDate(internalRow.getInt(pos));
- } else if (dataType instanceof TimeType) {
- return DateTimeUtils.toLocalTime(internalRow.getInt(pos));
- }
- throw new UnsupportedOperationException(
- "Unsupported data type conversion for Fluss type: "
- + dataType.getClass().getName());
+ return super.get(pos);
}
- private OffsetDateTime getTimestampLtz(long timestamp) {
+ private OffsetDateTime toIcebergTimestampLtz(long timestamp) {
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
ZoneOffset.UTC);
}
-
- private OffsetDateTime getTimestampLtz(Instant instant) {
- return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
- }
-
- @Override
- public Record copy() {
- throw new UnsupportedOperationException("method copy is not
supported.");
- }
-
- @Override
- public Record copy(Map<String, Object> overwriteValues) {
- throw new UnsupportedOperationException("method copy is not
supported.");
- }
-
- @Override
- public int size() {
- return icebergSchema.asStruct().fields().size();
- }
-
- @Override
- public <T> T get(int pos, Class<T> javaClass) {
- Object value = get(pos);
- if (value == null || javaClass.isInstance(value)) {
- return javaClass.cast(value);
- } else {
- throw new IllegalStateException(
- "Not an instance of " + javaClass.getName() + ": " +
value);
- }
- }
-
- @Override
- public <T> void set(int pos, T value) {
- throw new UnsupportedOperationException("method set is not
supported.");
- }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
index 6a78cec15..238ef5be9 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
@@ -44,7 +44,7 @@ public abstract class RecordWriter implements AutoCloseable {
this.bucket = tableBucket.getBucket();
this.flussRecordAsIcebergRecord =
new FlussRecordAsIcebergRecord(
- tableBucket.getBucket(), icebergSchema, flussRowType);
+ tableBucket.getBucket(), icebergSchema.asStruct(),
flussRowType);
}
public abstract void write(LogRecord record) throws Exception;
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverter.java
new file mode 100644
index 000000000..77d50bb96
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.utils;
+
+import org.apache.fluss.predicate.And;
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.FieldRef;
+import org.apache.fluss.predicate.FunctionVisitor;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.Or;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateVisitor;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIcebergLiteral;
+
+/**
+ * Converts a Fluss {@link org.apache.fluss.predicate.Predicate} into an
Iceberg {@link Expression}.
+ *
+ * <p>This class implements the {@link PredicateVisitor} pattern to traverse a
tree of Fluss
+ * predicates. It handles both leaf-level conditions (like equals, greater
than) and compound
+ * conditions (AND, OR).
+ */
+public class FlussToIcebergPredicateConverter implements
PredicateVisitor<Expression> {
+
+ private final Schema icebergSchema;
+ private final LeafFunctionConverter converter = new
LeafFunctionConverter();
+
+ public FlussToIcebergPredicateConverter(Schema schema) {
+ this.icebergSchema = schema;
+ }
+
+ public static Optional<Expression> convert(Schema schema, Predicate
flussPredicate) {
+ try {
+ return Optional.of(flussPredicate.visit(new
FlussToIcebergPredicateConverter(schema)));
+ } catch (UnsupportedOperationException e) {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Expression visit(LeafPredicate predicate) {
+ // Delegate the conversion of the specific function to a dedicated
visitor.
+ // This avoids a long chain of 'if-instanceof' checks.
+ return predicate.visit(converter);
+ }
+
+ @Override
+ public Expression visit(CompoundPredicate predicate) {
+ List<Expression> children =
+ predicate.children().stream().map(p ->
p.visit(this)).collect(Collectors.toList());
+
+ CompoundPredicate.Function function = predicate.function();
+ if (function instanceof And) {
+ return
children.stream().reduce(Expressions::and).orElse(Expressions.alwaysTrue());
+ } else if (function instanceof Or) {
+ return
children.stream().reduce(Expressions::or).orElse(Expressions.alwaysTrue());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported fluss compound predicate function: " +
predicate.function());
+ }
+ }
+
+ /**
+ * A visitor that implements the logic to convert each type of {@link
+ * org.apache.fluss.predicate.LeafFunction} to an Iceberg {@link
Expression}.
+ */
+ private class LeafFunctionConverter implements FunctionVisitor<Expression>
{
+
+ @Override
+ public Expression visitIsNotNull(FieldRef fieldRef) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.notNull(fieldName);
+ }
+
+ @Override
+ public Expression visitIsNull(FieldRef fieldRef) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.isNull(fieldName);
+ }
+
+ @Override
+ public Expression visitStartsWith(FieldRef fieldRef, Object literal) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.startsWith(
+ fieldName, convertToIcebergLiteral(fieldRef.index(),
literal).toString());
+ }
+
+ @Override
+ public Expression visitEndsWith(FieldRef fieldRef, Object literal) {
+ // iceberg not support endswith filter
+ throw new UnsupportedOperationException("Iceberg not supported
endswith filter.");
+ }
+
+ @Override
+ public Expression visitContains(FieldRef fieldRef, Object literal) {
+ // iceberg not support contains filter
+ throw new UnsupportedOperationException("Iceberg not supported
contains filter.");
+ }
+
+ @Override
+ public Expression visitLessThan(FieldRef fieldRef, Object literal) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.lessThan(
+ fieldName, convertToIcebergLiteral(fieldRef.index(),
literal));
+ }
+
+ @Override
+ public Expression visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.greaterThanOrEqual(
+ fieldName, convertToIcebergLiteral(fieldRef.index(),
literal));
+ }
+
+ @Override
+ public Expression visitNotEqual(FieldRef fieldRef, Object literal) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.notEqual(
+ fieldName, convertToIcebergLiteral(fieldRef.index(),
literal));
+ }
+
+ @Override
+ public Expression visitLessOrEqual(FieldRef fieldRef, Object literal) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.lessThanOrEqual(
+ fieldName, convertToIcebergLiteral(fieldRef.index(),
literal));
+ }
+
+ @Override
+ public Expression visitEqual(FieldRef fieldRef, Object literal) {
+ String fieldName = getField(fieldRef.index()).name();
+ return Expressions.equal(fieldName,
convertToIcebergLiteral(fieldRef.index(), literal));
+ }
+
+ @Override
+ public Expression visitGreaterThan(FieldRef fieldRef, Object literal) {
+ String fieldName = getField(fieldRef.index()).name();
+ Object icebergLiteral = convertToIcebergLiteral(fieldRef.index(),
literal);
+ return Expressions.greaterThan(fieldName, icebergLiteral);
+ }
+
+ @Override
+ public Expression visitIn(FieldRef fieldRef, List<Object> literals) {
+ String fieldName = getField(fieldRef.index()).name();
+ List<Object> icebergLiterals =
+ literals.stream()
+ .map(literal ->
convertToIcebergLiteral(fieldRef.index(), literal))
+ .collect(Collectors.toList());
+ return Expressions.in(fieldName, icebergLiterals);
+ }
+
+ @Override
+ public Expression visitNotIn(FieldRef fieldRef, List<Object> literals)
{
+ String fieldName = getField(fieldRef.index()).name();
+ List<Object> icebergLiterals =
+ literals.stream()
+ .map(literal ->
convertToIcebergLiteral(fieldRef.index(), literal))
+ .collect(Collectors.toList());
+ return Expressions.notIn(fieldName, icebergLiterals);
+ }
+
+ @Override
+ public Expression visitAnd(List<Expression> children) {
+ // shouldn't come to here
+ throw new UnsupportedOperationException("Unsupported visitAnd
method.");
+ }
+
+ @Override
+ public Expression visitOr(List<Expression> children) {
+ // shouldn't come to here
+ throw new UnsupportedOperationException("Unsupported visitOr
method.");
+ }
+
+ private Types.NestedField getField(int fieldIndex) {
+ return icebergSchema.columns().get(fieldIndex);
+ }
+
+ private Object convertToIcebergLiteral(int fieldIndex, Object
flussLiteral) {
+ return toIcebergLiteral(getField(fieldIndex), flussLiteral);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index f12b505b1..4d7582c53 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -17,7 +17,13 @@
package org.apache.fluss.lake.iceberg.utils;
+import org.apache.fluss.lake.iceberg.source.FlussRowAsIcebergRecord;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
@@ -27,6 +33,8 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import javax.annotation.Nullable;
@@ -82,4 +90,44 @@ public class IcebergConversions {
expression = Expressions.and(expression,
Expressions.equal(BUCKET_COLUMN_NAME, bucket));
return expression;
}
+
+ public static Object toIcebergLiteral(Types.NestedField field, Object
flussLiteral) {
+ InternalRow flussRow = GenericRow.of(flussLiteral);
+ FlussRowAsIcebergRecord flussRowAsIcebergRecord =
+ new FlussRowAsIcebergRecord(
+ Types.StructType.of(field),
+
RowType.of(convertIcebergTypeToFlussType(field.type())),
+ flussRow);
+ return flussRowAsIcebergRecord.get(0,
field.type().typeId().javaClass());
+ }
+
+ /** Converts Iceberg data types to Fluss data types. */
+ private static DataType convertIcebergTypeToFlussType(Type icebergType) {
+ if (icebergType instanceof Types.BooleanType) {
+ return DataTypes.BOOLEAN();
+ } else if (icebergType instanceof Types.IntegerType) {
+ return DataTypes.INT();
+ } else if (icebergType instanceof Types.LongType) {
+ return DataTypes.BIGINT();
+ } else if (icebergType instanceof Types.DoubleType) {
+ return DataTypes.DOUBLE();
+ } else if (icebergType instanceof Types.TimeType) {
+ return DataTypes.TIME();
+ } else if (icebergType instanceof Types.TimestampType) {
+ Types.TimestampType timestampType = (Types.TimestampType)
icebergType;
+ if (timestampType.shouldAdjustToUTC()) {
+ return DataTypes.TIMESTAMP_LTZ();
+ } else {
+ return DataTypes.TIMESTAMP();
+ }
+ } else if (icebergType instanceof Types.StringType) {
+ return DataTypes.STRING();
+ } else if (icebergType instanceof Types.DecimalType) {
+ Types.DecimalType decimalType = (Types.DecimalType) icebergType;
+ return DataTypes.DECIMAL(decimalType.precision(),
decimalType.scale());
+ }
+ throw new UnsupportedOperationException(
+ "Unsupported data type conversion for Iceberg type: "
+ + icebergType.getClass().getName());
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 06dd90eec..168f51cd2 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -123,10 +123,11 @@ public class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
// check if the plan contains partition filter
- // TODO: push down iceberg partition filter
+ // check filter push down
assertThat(plan)
.contains("TableSourceScan(")
- .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" +
partition + "'");
+ .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" +
partition + "'")
+ .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
List<Row> expectedFiltered =
writtenRows.stream()
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
similarity index 57%
copy from
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
copy to
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
index 55dedbeb0..0366ee62d 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
@@ -16,19 +16,14 @@
* limitations under the License.
*/
-package org.apache.fluss.lake.paimon.source;
+package org.apache.fluss.lake.iceberg.source;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.RecordReader;
import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.predicate.FieldRef;
-import org.apache.fluss.predicate.FunctionVisitor;
-import org.apache.fluss.predicate.LeafFunction;
-import org.apache.fluss.predicate.LeafPredicate;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.predicate.PredicateBuilder;
import org.apache.fluss.record.LogRecord;
-import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.IntType;
import org.apache.fluss.types.RowType;
@@ -36,75 +31,82 @@ import org.apache.fluss.types.StringType;
import org.apache.fluss.utils.CloseableIterator;
import org.apache.flink.types.Row;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.schema.Schema;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
-/** UT for {@link PaimonLakeSource}. */
-class PaimonLakeSourceTest extends PaimonSourceTestBase {
+/** Test filter push down in {@link IcebergLakeSource}. */
+class IcebergLakeSourceTest extends IcebergSourceTestBase {
private static final Schema SCHEMA =
- Schema.newBuilder()
- .column("id", org.apache.paimon.types.DataTypes.INT())
- .column("name", org.apache.paimon.types.DataTypes.STRING())
- .column("__bucket",
org.apache.paimon.types.DataTypes.INT())
- .column("__offset",
org.apache.paimon.types.DataTypes.BIGINT())
- .column("__timestamp",
org.apache.paimon.types.DataTypes.TIMESTAMP(6))
- .primaryKey("id")
- .option(CoreOptions.BUCKET.key(), "1")
- .build();
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ optional(2, "name", Types.StringType.get()),
+ required(3, "__bucket", Types.IntegerType.get()),
+ required(4, "__offset", Types.LongType.get()),
+ required(5, "__timestamp",
Types.TimestampType.withZone()));
+
+ private static final PartitionSpec PARTITION_SPEC =
+ PartitionSpec.builderFor(SCHEMA).bucket("id",
DEFAULT_BUCKET_NUM).build();
private static final PredicateBuilder FLUSS_BUILDER =
new PredicateBuilder(RowType.of(DataTypes.BIGINT(),
DataTypes.STRING()));
@BeforeAll
protected static void beforeAll() {
- PaimonSourceTestBase.beforeAll();
+ IcebergSourceTestBase.beforeAll();
}
@Test
void testWithFilters() throws Exception {
TablePath tablePath = TablePath.of("fluss", "test_filters");
- createTable(tablePath, SCHEMA);
+ createTable(tablePath, SCHEMA, PARTITION_SPEC);
// write some rows
- List<InternalRow> rows = new ArrayList<>();
+ Table table = getTable(tablePath);
+ List<Record> rows = new ArrayList<>();
for (int i = 1; i <= 4; i++) {
rows.add(
- GenericRow.of(
+ createIcebergRecord(
+ SCHEMA,
i,
- BinaryString.fromString("name" + i),
+ "name" + i,
0,
(long) i,
-
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+ OffsetDateTime.now(ZoneOffset.UTC)));
}
- writeRecord(tablePath, rows);
+ writeRecord(table, rows, null, 0);
// write some rows again
+ table.refresh();
rows = new ArrayList<>();
- for (int i = 10; i <= 14; i++) {
+ for (int i = 14; i <= 16; i++) {
rows.add(
- GenericRow.of(
+ createIcebergRecord(
+ SCHEMA,
i,
- BinaryString.fromString("name" + i),
+ "name" + i,
0,
(long) i,
-
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+ OffsetDateTime.now(ZoneOffset.UTC)));
}
- writeRecord(tablePath, rows);
+ writeRecord(table, rows, null, 0);
+ table.refresh();
// test all filter can be accepted
Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
@@ -113,25 +115,23 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
FLUSS_BUILDER.startsWith(1,
org.apache.fluss.row.BinaryString.fromString("name"));
List<Predicate> allFilters = Arrays.asList(filter1, filter2, filter3);
- LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ LakeSource<IcebergSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
LakeSource.FilterPushDownResult filterPushDownResult =
lakeSource.withFilters(allFilters);
assertThat(filterPushDownResult.acceptedPredicates()).isEqualTo(allFilters);
assertThat(filterPushDownResult.remainingPredicates()).isEmpty();
// read data to verify the filters work
- List<PaimonSplit> paimonSplits = lakeSource.createPlanner(() ->
2).plan();
- assertThat(paimonSplits).hasSize(1);
- PaimonSplit paimonSplit = paimonSplits.get(0);
- // make sure we only have one data file after filter to check plan
will make use
- // of filters
- assertThat(paimonSplit.dataSplit().dataFiles()).hasSize(1);
-
- // read data with filter to mae sure the reader with filter works
properly
+ List<IcebergSplit> icebergSplits =
+ lakeSource.createPlanner(() ->
table.currentSnapshot().snapshotId()).plan();
+ assertThat(icebergSplits).hasSize(1);
+ IcebergSplit icebergSplit = icebergSplits.get(0);
+
+ // read data with filter to make sure the reader with filter works
properly
List<Row> actual = new ArrayList<>();
org.apache.fluss.row.InternalRow.FieldGetter[] fieldGetters =
org.apache.fluss.row.InternalRow.createFieldGetters(
RowType.of(new IntType(), new StringType()));
- RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
icebergSplit);
try (CloseableIterator<LogRecord> iterator = recordReader.read()) {
actual.addAll(
convertToFlinkRow(
@@ -142,12 +142,7 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
// test mix one unaccepted filter
Predicate nonConvertibleFilter =
- new LeafPredicate(
- new UnSupportFilterFunction(),
- DataTypes.INT(),
- 0,
- "f1",
- Collections.emptyList());
+ FLUSS_BUILDER.endsWith(1,
org.apache.fluss.row.BinaryString.fromString("name"));
allFilters = Arrays.asList(nonConvertibleFilter, filter1, filter2);
filterPushDownResult = lakeSource.withFilters(allFilters);
@@ -163,34 +158,4 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
assertThat(filterPushDownResult.remainingPredicates().toString())
.isEqualTo(allFilters.toString());
}
-
- private static class UnSupportFilterFunction extends LeafFunction {
-
- @Override
- public boolean test(DataType type, Object field, List<Object>
literals) {
- return false;
- }
-
- @Override
- public boolean test(
- DataType type,
- long rowCount,
- Object min,
- Object max,
- Long nullCount,
- List<Object> literals) {
- return false;
- }
-
- @Override
- public Optional<LeafFunction> negate() {
- return Optional.empty();
- }
-
- @Override
- public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef,
List<Object> literals) {
- throw new UnsupportedOperationException(
- "Unsupported filter function for test purpose.");
- }
- }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverterTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverterTest.java
new file mode 100644
index 000000000..1e8bff09d
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.utils;
+
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.fluss.row.BinaryString.fromString;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link FlussToIcebergPredicateConverter}. */
+class FlussToIcebergPredicateConverterTest {
+
+ private static final PredicateBuilder FLUSS_BUILDER =
+ new PredicateBuilder(
+ RowType.builder()
+ .field("f1", DataTypes.BIGINT())
+ .field("f2", DataTypes.DOUBLE())
+ .field("f3", DataTypes.STRING())
+ .build());
+
+ private static final Schema ICEBERG_SCHEMA =
+ new Schema(
+ required(1, "f1", Types.LongType.get()),
+ optional(2, "f2", Types.DoubleType.get()),
+ required(3, "f3", Types.StringType.get()));
+
+ public static Stream<Arguments> parameters() {
+ // A comprehensive set of test cases for different predicate types.
+ return Stream.of(
+ // Leaf Predicates
+ Arguments.of(FLUSS_BUILDER.equal(0, 12L),
Expressions.equal("f1", 12L)),
+ Arguments.of(
+ FLUSS_BUILDER.notEqual(2, fromString("test")),
+ Expressions.notEqual("f3", "test")),
+ Arguments.of(
+ FLUSS_BUILDER.greaterThan(1, 99.9d),
Expressions.greaterThan("f2", 99.9d)),
+ Arguments.of(
+ FLUSS_BUILDER.greaterOrEqual(0, 100L),
+ Expressions.greaterThanOrEqual("f1", 100L)),
+ Arguments.of(FLUSS_BUILDER.lessThan(1, 0.1d),
Expressions.lessThan("f2", 0.1d)),
+ Arguments.of(
+ FLUSS_BUILDER.lessOrEqual(0, 50L),
Expressions.lessThanOrEqual("f1", 50L)),
+ Arguments.of(FLUSS_BUILDER.isNull(2),
Expressions.isNull("f3")),
+ Arguments.of(FLUSS_BUILDER.isNotNull(1),
Expressions.notNull("f2")),
+ Arguments.of(
+ FLUSS_BUILDER.in(
+ 2,
+ Stream.of("a", "b", "c")
+ .map(BinaryString::fromString)
+ .collect(Collectors.toList())),
+ Stream.of("a", "b", "c")
+ .map(s -> (Expression) Expressions.equal("f3",
s))
+ .reduce(Expressions::or)
+ .get()),
+ Arguments.of(
+ FLUSS_BUILDER.in(
+ 2,
+ Stream.of(
+ "a", "b", "c", "a", "b", "c",
"a", "b", "c", "a",
+ "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c", "a", "b", "c")
+ .map(BinaryString::fromString)
+ .collect(Collectors.toList())),
+ Expressions.in(
+ "f3",
+ Arrays.asList(
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c"))),
+ Arguments.of(
+ FLUSS_BUILDER.notIn(
+ 2,
+ Stream.of(
+ "a", "b", "c", "a", "b", "c",
"a", "b", "c", "a",
+ "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c", "a", "b", "c")
+ .map(BinaryString::fromString)
+ .collect(Collectors.toList())),
+ Expressions.notIn(
+ "f3",
+ Arrays.asList(
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b", "c",
+ "a", "b", "c", "a", "b", "c", "a",
"b", "c", "a", "b",
+ "c"))),
+ Arguments.of(
+ FLUSS_BUILDER.startsWith(2, fromString("start")),
+ Expressions.startsWith("f3", "start")),
+
+ // Compound Predicates
+ Arguments.of(
+ PredicateBuilder.and(
+ FLUSS_BUILDER.equal(0, 1L),
FLUSS_BUILDER.isNotNull(2)),
+ Expressions.and(Expressions.equal("f1", 1L),
Expressions.notNull("f3"))),
+ Arguments.of(
+ PredicateBuilder.or(
+ FLUSS_BUILDER.lessThan(1, 10.0),
+ FLUSS_BUILDER.greaterThan(1, 100.0)),
+ Expressions.or(
+ Expressions.lessThan("f2", 10.0),
+ Expressions.greaterThan("f2", 100.0))),
+
+ // Nested Predicate
+ Arguments.of(
+ PredicateBuilder.and(
+ FLUSS_BUILDER.equal(2, fromString("test")),
+ PredicateBuilder.or(
+ FLUSS_BUILDER.equal(0, 1L),
+ FLUSS_BUILDER.greaterThan(1, 50.0))),
+ Expressions.and(
+ Expressions.equal("f3", "test"),
+ Expressions.or(
+ Expressions.equal("f1", 1L),
+ Expressions.greaterThan("f2",
50.0)))));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testPredicateConverter(Predicate flussPredicate, Expression
expectedPredicate) {
+ Expression convertedIcebergExpression =
+ FlussToIcebergPredicateConverter.convert(ICEBERG_SCHEMA,
flussPredicate).get();
+
assertThat(convertedIcebergExpression.toString()).isEqualTo(expectedPredicate.toString());
+ }
+
+ public static Stream<Arguments> parametersNotSupported() {
+ return Stream.of(
+ Arguments.of(FLUSS_BUILDER.endsWith(2, fromString("end"))),
+ Arguments.of(FLUSS_BUILDER.contains(2, fromString("mid"))));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parametersNotSupported")
+ void testNotSupportedPredicateConverter(Predicate flussPredicate) {
+ assertThat(FlussToIcebergPredicateConverter.convert(ICEBERG_SCHEMA,
flussPredicate))
+ .isEqualTo(Optional.empty());
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
index 084876475..512024807 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
@@ -42,7 +42,6 @@ import static org.assertj.core.api.Assertions.assertThat;
/** UT for {@link IcebergConversions}. */
class IcebergConversionsTest {
- ;
@Test
void testToPartition(@TempDir File tempWarehouseDir) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
index 979356fa2..5d3e9dc36 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -77,9 +77,10 @@ public class PaimonLakeSource implements
LakeSource<PaimonSplit> {
List<Predicate> unConsumedPredicates = new ArrayList<>();
List<Predicate> consumedPredicates = new ArrayList<>();
List<org.apache.paimon.predicate.Predicate> converted = new
ArrayList<>();
+ RowType rowType = getRowType(tablePath);
for (Predicate predicate : predicates) {
Optional<org.apache.paimon.predicate.Predicate> optPredicate =
-
FlussToPaimonPredicateConverter.convert(getRowType(tablePath), predicate);
+ FlussToPaimonPredicateConverter.convert(rowType,
predicate);
if (optPredicate.isPresent()) {
consumedPredicates.add(predicate);
converted.add(optPredicate.get());
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
index 55dedbeb0..0d184fc3c 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
@@ -126,7 +126,7 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
// of filters
assertThat(paimonSplit.dataSplit().dataFiles()).hasSize(1);
- // read data with filter to mae sure the reader with filter works
properly
+ // read data with filter to make sure the reader with filter works
properly
List<Row> actual = new ArrayList<>();
org.apache.fluss.row.InternalRow.FieldGetter[] fieldGetters =
org.apache.fluss.row.InternalRow.createFieldGetters(