This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new fb6f6d59f [paimon] Paimon implement union read interfaces (#1505)
fb6f6d59f is described below
commit fb6f6d59f38fbf165b8a5553c067fa66d28407c4
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Aug 11 17:23:20 2025 +0800
[paimon] Paimon implement union read interfaces (#1505)
---
.../fluss/lake/paimon/PaimonLakeStorage.java | 6 +-
.../FlussRowAsPaimonRow.java} | 85 ++----
.../fluss/lake/paimon/source/PaimonLakeSource.java | 110 ++++++++
.../lake/paimon/source/PaimonRecordReader.java | 152 +++++++++++
.../paimon/source/PaimonSortedRecordReader.java | 64 +++++
.../fluss/lake/paimon/source/PaimonSplit.java | 64 +++++
.../lake/paimon/source/PaimonSplitPlanner.java | 96 +++++++
.../lake/paimon/source/PaimonSplitSerializer.java | 59 ++++
.../paimon/tiering/FlussRecordAsPaimonRow.java | 126 +--------
.../fluss/lake/paimon/utils/PaimonConversions.java | 15 +
.../lake/paimon/utils/PaimonRowAsFlussRow.java | 141 ++++++++++
.../lake/paimon/flink/FlinkUnionReadTestBase.java | 3 +-
.../paimon/source/FlussRowAsPaimonRowTest.java | 143 ++++++++++
.../lake/paimon/source/PaimonRecordReaderTest.java | 303 +++++++++++++++++++++
.../source/PaimonSortedRecordReaderTest.java | 212 ++++++++++++++
.../lake/paimon/source/PaimonSourceTestBase.java | 138 ++++++++++
.../lake/paimon/source/PaimonSplitPlannerTest.java | 84 ++++++
.../paimon/source/PaimonSplitSerializerTest.java | 86 ++++++
.../fluss/lake/paimon/source/PaimonSplitTest.java | 83 ++++++
.../testutils/FlinkPaimonTieringTestBase.java | 2 +-
20 files changed, 1789 insertions(+), 183 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
index 03da696a0..f3eac859a 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
@@ -19,6 +19,8 @@ package com.alibaba.fluss.lake.paimon;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
+import com.alibaba.fluss.lake.paimon.source.PaimonLakeSource;
+import com.alibaba.fluss.lake.paimon.source.PaimonSplit;
import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable;
import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory;
import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult;
@@ -46,7 +48,7 @@ public class PaimonLakeStorage implements LakeStorage {
}
@Override
- public LakeSource<?> createLakeSource(TablePath tablePath) {
- throw new UnsupportedOperationException("Not implemented");
+ public LakeSource<PaimonSplit> createLakeSource(TablePath tablePath) {
+ return new PaimonLakeSource(paimonConfig, tablePath);
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java
similarity index 61%
copy from
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
copy to
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java
index 97d543148..ddcc5b8c3 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,9 +16,8 @@
* limitations under the License.
*/
-package com.alibaba.fluss.lake.paimon.tiering;
+package com.alibaba.fluss.lake.paimon.source;
-import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.row.TimestampLtz;
import com.alibaba.fluss.row.TimestampNtz;
@@ -32,45 +32,30 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static com.alibaba.fluss.utils.Preconditions.checkState;
+/** Adapter class for converting Fluss row to Paimon row. */
+public class FlussRowAsPaimonRow implements InternalRow {
-/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
-public class FlussRecordAsPaimonRow implements InternalRow {
+ protected com.alibaba.fluss.row.InternalRow internalRow;
+ protected final RowType tableRowType;
- // Lake table for paimon will append three system columns: __bucket,
__offset,__timestamp
- private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
- private final RowType tableTowType;
- private final int bucket;
- private LogRecord logRecord;
- private int originRowFieldCount;
- private com.alibaba.fluss.row.InternalRow internalRow;
-
- public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
- this.bucket = bucket;
- this.tableTowType = tableTowType;
+ public FlussRowAsPaimonRow(RowType tableTowType) {
+ this.tableRowType = tableTowType;
}
- public void setFlussRecord(LogRecord logRecord) {
- this.logRecord = logRecord;
- this.internalRow = logRecord.getRow();
- this.originRowFieldCount = internalRow.getFieldCount();
- checkState(
- originRowFieldCount == tableTowType.getFieldCount() -
LAKE_PAIMON_SYSTEM_COLUMNS,
- "The paimon table fields count must equals to LogRecord's
fields count.");
+ public FlussRowAsPaimonRow(
+ com.alibaba.fluss.row.InternalRow internalRow, RowType
tableTowType) {
+ this.internalRow = internalRow;
+ this.tableRowType = tableTowType;
}
@Override
public int getFieldCount() {
- return
- // business (including partitions) + system (three system fields:
bucket, offset,
- // timestamp)
- originRowFieldCount + LAKE_PAIMON_SYSTEM_COLUMNS;
+ return internalRow.getFieldCount();
}
@Override
public RowKind getRowKind() {
- return toRowKind(logRecord.getChangeType());
+ return RowKind.INSERT;
}
@Override
@@ -80,11 +65,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
@Override
public boolean isNullAt(int pos) {
- if (pos < originRowFieldCount) {
- return internalRow.isNullAt(pos);
- }
- // is the last three system fields: bucket, offset, timestamp which
are never null
- return false;
+ return internalRow.isNullAt(pos);
}
@Override
@@ -104,23 +85,11 @@ public class FlussRecordAsPaimonRow implements InternalRow
{
@Override
public int getInt(int pos) {
- if (pos == originRowFieldCount) {
- // bucket system column
- return bucket;
- }
return internalRow.getInt(pos);
}
@Override
public long getLong(int pos) {
- if (pos == originRowFieldCount + 1) {
- // offset system column
- return logRecord.logOffset();
- } else if (pos == originRowFieldCount + 2) {
- // timestamp system column
- return logRecord.timestamp();
- }
- // the origin RowData
return internalRow.getLong(pos);
}
@@ -151,13 +120,7 @@ public class FlussRecordAsPaimonRow implements InternalRow
{
@Override
public Timestamp getTimestamp(int pos, int precision) {
- // it's timestamp system column
- if (pos == originRowFieldCount + 2) {
- return Timestamp.fromEpochMillis(logRecord.timestamp());
- }
-
- DataType paimonTimestampType = tableTowType.getTypeAt(pos);
-
+ DataType paimonTimestampType = tableRowType.getTypeAt(pos);
switch (paimonTimestampType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
if (TimestampNtz.isCompact(precision)) {
@@ -191,7 +154,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
}
@Override
- public Variant getVariant(int pos) {
+ public Variant getVariant(int i) {
throw new UnsupportedOperationException(
"getVariant is not support for Fluss record currently.");
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
new file mode 100644
index 000000000..a41f5a2fc
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.predicate.Predicate;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/**
+ * Paimon Lake format implementation of {@link
com.alibaba.fluss.lake.source.LakeSource} for reading
+ * paimon table.
+ */
+public class PaimonLakeSource implements LakeSource<PaimonSplit> {
+
+ private final Configuration paimonConfig;
+ private final TablePath tablePath;
+
+ private @Nullable int[][] project;
+ private @Nullable org.apache.paimon.predicate.Predicate predicate;
+
+ public PaimonLakeSource(Configuration paimonConfig, TablePath tablePath) {
+ this.paimonConfig = paimonConfig;
+ this.tablePath = tablePath;
+ }
+
+ @Override
+ public void withProject(int[][] project) {
+ this.project = project;
+ }
+
+ @Override
+ public void withLimit(int limit) {
+ throw new UnsupportedOperationException("Not impl.");
+ }
+
+ @Override
+ public FilterPushDownResult withFilters(List<Predicate> predicates) {
+ return FilterPushDownResult.of(Collections.emptyList(), predicates);
+ }
+
+ @Override
+ public Planner<PaimonSplit> createPlanner(PlannerContext plannerContext) {
+ return new PaimonSplitPlanner(
+ paimonConfig, tablePath, predicate,
plannerContext.snapshotId());
+ }
+
+ @Override
+ public RecordReader createRecordReader(ReaderContext<PaimonSplit> context)
throws IOException {
+ try (Catalog catalog = getCatalog()) {
+ FileStoreTable fileStoreTable = getTable(catalog, tablePath);
+ if (fileStoreTable.primaryKeys().isEmpty()) {
+ return new PaimonRecordReader(
+ fileStoreTable, context.lakeSplit(), project,
predicate);
+ } else {
+ return new PaimonSortedRecordReader(
+ fileStoreTable, context.lakeSplit(), project,
predicate);
+ }
+ } catch (Exception e) {
+ throw new IOException("Fail to create record reader.", e);
+ }
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PaimonSplit> getSplitSerializer() {
+ return new PaimonSplitSerializer();
+ }
+
+ private Catalog getCatalog() {
+ return CatalogFactory.createCatalog(
+ CatalogContext.create(Options.fromMap(paimonConfig.toMap())));
+ }
+
+ private FileStoreTable getTable(Catalog catalog, TablePath tablePath)
throws Exception {
+ return (FileStoreTable) catalog.getTable(toPaimon(tablePath));
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
new file mode 100644
index 000000000..1ace3dfcd
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.IntStream;
+
+import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Record reader for paimon table. */
+public class PaimonRecordReader implements RecordReader {
+
+ protected PaimonRowAsFlussRecordIterator iterator;
+ protected @Nullable int[][] project;
+ protected @Nullable Predicate predicate;
+ protected RowType paimonRowType;
+
+ public PaimonRecordReader(
+ FileStoreTable fileStoreTable,
+ PaimonSplit split,
+ @Nullable int[][] project,
+ @Nullable Predicate predicate)
+ throws IOException {
+ ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+ RowType paimonFullRowType = fileStoreTable.rowType();
+ if (project != null) {
+ readBuilder = applyProject(readBuilder, project,
paimonFullRowType);
+ }
+
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+
+ TableRead tableRead = readBuilder.newRead();
+ paimonRowType = readBuilder.readType();
+
+ org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+ tableRead.createReader(split.dataSplit());
+ iterator =
+ new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+ recordReader.toCloseableIterator(), paimonRowType);
+ }
+
+ @Override
+ public CloseableIterator<LogRecord> read() throws IOException {
+ return iterator;
+ }
+
+ private ReadBuilder applyProject(
+ ReadBuilder readBuilder, int[][] projects, RowType
paimonFullRowType) {
+ int[] projectIds = Arrays.stream(projects).mapToInt(project ->
project[0]).toArray();
+
+ int offsetFieldPos =
paimonFullRowType.getFieldIndex(OFFSET_COLUMN_NAME);
+ int timestampFieldPos =
paimonFullRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);
+
+ int[] paimonProject =
+ IntStream.concat(
+ IntStream.of(projectIds),
+ IntStream.of(offsetFieldPos,
timestampFieldPos))
+ .toArray();
+
+ return readBuilder.withProjection(paimonProject);
+ }
+
+ /** Iterator for paimon row as fluss record. */
+ public static class PaimonRowAsFlussRecordIterator implements
CloseableIterator<LogRecord> {
+
+ private final org.apache.paimon.utils.CloseableIterator<InternalRow>
paimonRowIterator;
+
+ private final ProjectedRow projectedRow;
+ private final PaimonRowAsFlussRow paimonRowAsFlussRow;
+
+ private final int logOffsetColIndex;
+ private final int timestampColIndex;
+
+ public PaimonRowAsFlussRecordIterator(
+ org.apache.paimon.utils.CloseableIterator<InternalRow>
paimonRowIterator,
+ RowType paimonRowType) {
+ this.paimonRowIterator = paimonRowIterator;
+ this.logOffsetColIndex =
paimonRowType.getFieldIndex(OFFSET_COLUMN_NAME);
+ this.timestampColIndex =
paimonRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);
+
+ int[] project = IntStream.range(0, paimonRowType.getFieldCount() -
2).toArray();
+ projectedRow = ProjectedRow.from(project);
+ paimonRowAsFlussRow = new PaimonRowAsFlussRow();
+ }
+
+ @Override
+ public void close() {
+ try {
+ paimonRowIterator.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to close iterator.", e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return paimonRowIterator.hasNext();
+ }
+
+ @Override
+ public LogRecord next() {
+ InternalRow paimonRow = paimonRowIterator.next();
+ ChangeType changeType = toChangeType(paimonRow.getRowKind());
+ long offset = paimonRow.getLong(logOffsetColIndex);
+ long timestamp = paimonRow.getTimestamp(timestampColIndex,
6).getMillisecond();
+
+ return new GenericRecord(
+ offset,
+ timestamp,
+ changeType,
+
projectedRow.replaceRow(paimonRowAsFlussRow.replaceRow(paimonRow)));
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java
new file mode 100644
index 000000000..281695105
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+/** Sorted record reader for primary key paimon table. */
+public class PaimonSortedRecordReader extends PaimonRecordReader implements
SortedRecordReader {
+
+ Comparator<com.alibaba.fluss.row.InternalRow> comparator;
+
+ public PaimonSortedRecordReader(
+ FileStoreTable fileStoreTable,
+ PaimonSplit split,
+ @Nullable int[][] project,
+ @Nullable Predicate predicate)
+ throws IOException {
+ super(fileStoreTable, split, project, predicate);
+ this.comparator =
+ toFlussRowComparator(
+ paimonRowType,
+ ((KeyValueFileStore)
fileStoreTable.store()).newKeyComparator());
+ }
+
+ @Override
+ public Comparator<InternalRow> order() {
+ return comparator;
+ }
+
+ private Comparator<com.alibaba.fluss.row.InternalRow> toFlussRowComparator(
+ RowType rowType, Comparator<org.apache.paimon.data.InternalRow>
paimonRowcomparator) {
+ return (row1, row2) ->
+ paimonRowcomparator.compare(
+ new FlussRowAsPaimonRow(row1, rowType),
+ new FlussRowAsPaimonRow(row2, rowType));
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java
new file mode 100644
index 000000000..b28f8b61c
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSplit;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.source.DataSplit;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Split for paimon table. */
+public class PaimonSplit implements LakeSplit {
+
+ private final DataSplit dataSplit;
+
+ public PaimonSplit(DataSplit dataSplit) {
+ this.dataSplit = dataSplit;
+ }
+
+ @Override
+ public int bucket() {
+ return dataSplit.bucket();
+ }
+
+ @Override
+ public List<String> partition() {
+ BinaryRow partition = dataSplit.partition();
+ if (partition.getFieldCount() == 0) {
+ return Collections.emptyList();
+ }
+
+ List<String> partitions = new ArrayList<>();
+ for (int i = 0; i < partition.getFieldCount(); i++) {
+ // Todo Currently, partition column must be String datatype, so we
can always use
+ // consider it as string. Revisit here when
+ // #489 is finished.
+ partitions.add(partition.getString(i).toString());
+ }
+ return partitions;
+ }
+
+ public DataSplit dataSplit() {
+ return dataSplit;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
new file mode 100644
index 000000000..e481d8102
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Split panner for paimon table. */
+public class PaimonSplitPlanner implements Planner<PaimonSplit> {
+
+ private final Configuration paimonConfig;
+ private final TablePath tablePath;
+ private final @Nullable Predicate predicate;
+ private final long snapshotId;
+
+ public PaimonSplitPlanner(
+ Configuration paimonConfig,
+ TablePath tablePath,
+ @Nullable Predicate predicate,
+ long snapshotId) {
+ this.paimonConfig = paimonConfig;
+ this.tablePath = tablePath;
+ this.predicate = predicate;
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public List<PaimonSplit> plan() {
+ try {
+ List<PaimonSplit> splits = new ArrayList<>();
+ try (Catalog catalog = getCatalog()) {
+ FileStoreTable fileStoreTable = getTable(catalog, tablePath,
snapshotId);
+ // TODO: support filter .withFilter(predicate)
+ InnerTableScan tableScan = fileStoreTable.newScan();
+ for (Split split : tableScan.plan().splits()) {
+ DataSplit dataSplit = (DataSplit) split;
+ splits.add(new PaimonSplit(dataSplit));
+ }
+ }
+ return splits;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to plan splits for paimon.", e);
+ }
+ }
+
+ private Catalog getCatalog() {
+ return CatalogFactory.createCatalog(
+ CatalogContext.create(Options.fromMap(paimonConfig.toMap())));
+ }
+
+ private FileStoreTable getTable(Catalog catalog, TablePath tablePath, long
snapshotId)
+ throws Exception {
+ return (FileStoreTable)
+ catalog.getTable(toPaimon(tablePath))
+ .copy(
+ Collections.singletonMap(
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ String.valueOf(snapshotId)));
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java
new file mode 100644
index 000000000..112a8a3f2
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/** Serializer for paimon split. */
+public class PaimonSplitSerializer implements
SimpleVersionedSerializer<PaimonSplit> {
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(PaimonSplit paimonSplit) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper view = new
DataOutputViewStreamWrapper(out);
+ DataSplit dataSplit = paimonSplit.dataSplit();
+ InstantiationUtil.serializeObject(view, dataSplit);
+ return out.toByteArray();
+ }
+
+ @Override
+ public PaimonSplit deserialize(int version, byte[] serialized) throws
IOException {
+ ByteArrayInputStream in = new ByteArrayInputStream(serialized);
+ DataSplit dataSplit;
+ try {
+ dataSplit = InstantiationUtil.deserializeObject(in,
getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return new PaimonSplit(dataSplit);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index 97d543148..5edeb482a 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -17,18 +17,11 @@
package com.alibaba.fluss.lake.paimon.tiering;
+import com.alibaba.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import com.alibaba.fluss.record.LogRecord;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.Variant;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -36,19 +29,17 @@ import static
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
import static com.alibaba.fluss.utils.Preconditions.checkState;
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
-public class FlussRecordAsPaimonRow implements InternalRow {
+public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
// Lake table for paimon will append three system columns: __bucket,
__offset,__timestamp
private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
- private final RowType tableTowType;
private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
- private com.alibaba.fluss.row.InternalRow internalRow;
public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
+ super(tableTowType);
this.bucket = bucket;
- this.tableTowType = tableTowType;
}
public void setFlussRecord(LogRecord logRecord) {
@@ -56,7 +47,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
this.internalRow = logRecord.getRow();
this.originRowFieldCount = internalRow.getFieldCount();
checkState(
- originRowFieldCount == tableTowType.getFieldCount() -
LAKE_PAIMON_SYSTEM_COLUMNS,
+ originRowFieldCount == tableRowType.getFieldCount() -
LAKE_PAIMON_SYSTEM_COLUMNS,
"The paimon table fields count must equals to LogRecord's
fields count.");
}
@@ -73,42 +64,22 @@ public class FlussRecordAsPaimonRow implements InternalRow {
return toRowKind(logRecord.getChangeType());
}
- @Override
- public void setRowKind(RowKind rowKind) {
- // do nothing
- }
-
@Override
public boolean isNullAt(int pos) {
if (pos < originRowFieldCount) {
- return internalRow.isNullAt(pos);
+ return super.isNullAt(pos);
}
// is the last three system fields: bucket, offset, timestamp which
are never null
return false;
}
- @Override
- public boolean getBoolean(int pos) {
- return internalRow.getBoolean(pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return internalRow.getByte(pos);
- }
-
- @Override
- public short getShort(int pos) {
- return internalRow.getShort(pos);
- }
-
@Override
public int getInt(int pos) {
if (pos == originRowFieldCount) {
// bucket system column
return bucket;
}
- return internalRow.getInt(pos);
+ return super.getInt(pos);
}
@Override
@@ -121,32 +92,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
return logRecord.timestamp();
}
// the origin RowData
- return internalRow.getLong(pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return internalRow.getFloat(pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return internalRow.getDouble(pos);
- }
-
- @Override
- public BinaryString getString(int pos) {
- return BinaryString.fromBytes(internalRow.getString(pos).toBytes());
- }
-
- @Override
- public Decimal getDecimal(int pos, int precision, int scale) {
- com.alibaba.fluss.row.Decimal flussDecimal =
internalRow.getDecimal(pos, precision, scale);
- if (flussDecimal.isCompact()) {
- return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(),
precision, scale);
- } else {
- return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(),
precision, scale);
- }
+ return super.getLong(pos);
}
@Override
@@ -155,62 +101,6 @@ public class FlussRecordAsPaimonRow implements InternalRow
{
if (pos == originRowFieldCount + 2) {
return Timestamp.fromEpochMillis(logRecord.timestamp());
}
-
- DataType paimonTimestampType = tableTowType.getTypeAt(pos);
-
- switch (paimonTimestampType.getTypeRoot()) {
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- if (TimestampNtz.isCompact(precision)) {
- return Timestamp.fromEpochMillis(
- internalRow.getTimestampNtz(pos,
precision).getMillisecond());
- } else {
- TimestampNtz timestampNtz =
internalRow.getTimestampNtz(pos, precision);
- return Timestamp.fromEpochMillis(
- timestampNtz.getMillisecond(),
timestampNtz.getNanoOfMillisecond());
- }
-
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- if (TimestampLtz.isCompact(precision)) {
- return Timestamp.fromEpochMillis(
- internalRow.getTimestampLtz(pos,
precision).getEpochMillisecond());
- } else {
- TimestampLtz timestampLtz =
internalRow.getTimestampLtz(pos, precision);
- return Timestamp.fromEpochMillis(
- timestampLtz.getEpochMillisecond(),
- timestampLtz.getNanoOfMillisecond());
- }
- default:
- throw new UnsupportedOperationException(
- "Unsupported data type to get timestamp: " +
paimonTimestampType);
- }
- }
-
- @Override
- public byte[] getBinary(int pos) {
- return internalRow.getBytes(pos);
- }
-
- @Override
- public Variant getVariant(int pos) {
- throw new UnsupportedOperationException(
- "getVariant is not support for Fluss record currently.");
- }
-
- @Override
- public InternalArray getArray(int pos) {
- throw new UnsupportedOperationException(
- "getArray is not support for Fluss record currently.");
- }
-
- @Override
- public InternalMap getMap(int pos) {
- throw new UnsupportedOperationException(
- "getMap is not support for Fluss record currently.");
- }
-
- @Override
- public InternalRow getRow(int pos, int pos1) {
- throw new UnsupportedOperationException(
- "getRow is not support for Fluss record currently.");
+ return super.getTimestamp(pos, precision);
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
index 437cf1913..9d51e787b 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
@@ -50,6 +50,21 @@ public class PaimonConversions {
}
}
+ public static ChangeType toChangeType(RowKind rowKind) {
+ switch (rowKind) {
+ case INSERT:
+ return ChangeType.INSERT;
+ case UPDATE_BEFORE:
+ return ChangeType.UPDATE_BEFORE;
+ case UPDATE_AFTER:
+ return ChangeType.UPDATE_AFTER;
+ case DELETE:
+ return ChangeType.DELETE;
+ default:
+ throw new IllegalArgumentException("Unsupported rowKind: " +
rowKind);
+ }
+ }
+
public static Identifier toPaimon(TablePath tablePath) {
return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
new file mode 100644
index 000000000..3fc625501
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.utils;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.paimon.data.Timestamp;
+
+/** Adapter for paimon row as fluss row. */
+public class PaimonRowAsFlussRow implements InternalRow {
+
+ private org.apache.paimon.data.InternalRow paimonRow;
+
+ public PaimonRowAsFlussRow() {}
+
+ public PaimonRowAsFlussRow(org.apache.paimon.data.InternalRow paimonRow) {
+ this.paimonRow = paimonRow;
+ }
+
+ public PaimonRowAsFlussRow replaceRow(org.apache.paimon.data.InternalRow
paimonRow) {
+ this.paimonRow = paimonRow;
+ return this;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return paimonRow.getFieldCount();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return paimonRow.isNullAt(pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return paimonRow.getBoolean(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return paimonRow.getByte(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return paimonRow.getShort(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return paimonRow.getInt(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return paimonRow.getLong(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return paimonRow.getFloat(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return paimonRow.getDouble(pos);
+ }
+
+ @Override
+ public BinaryString getChar(int pos, int length) {
+ return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ org.apache.paimon.data.Decimal paimonDecimal =
paimonRow.getDecimal(pos, precision, scale);
+ if (paimonDecimal.isCompact()) {
+ return Decimal.fromUnscaledLong(paimonDecimal.toUnscaledLong(),
precision, scale);
+ } else {
+ return Decimal.fromBigDecimal(paimonDecimal.toBigDecimal(),
precision, scale);
+ }
+ }
+
+ @Override
+ public TimestampNtz getTimestampNtz(int pos, int precision) {
+ Timestamp timestamp = paimonRow.getTimestamp(pos, precision);
+ if (TimestampNtz.isCompact(precision)) {
+ return TimestampNtz.fromMillis(timestamp.getMillisecond());
+ } else {
+ return TimestampNtz.fromMillis(
+ timestamp.getMillisecond(),
timestamp.getNanoOfMillisecond());
+ }
+ }
+
+ @Override
+ public TimestampLtz getTimestampLtz(int pos, int precision) {
+ Timestamp timestamp = paimonRow.getTimestamp(pos, precision);
+ if (TimestampLtz.isCompact(precision)) {
+ return TimestampLtz.fromEpochMillis(timestamp.getMillisecond());
+ } else {
+ return TimestampLtz.fromEpochMillis(
+ timestamp.getMillisecond(),
timestamp.getNanoOfMillisecond());
+ }
+ }
+
+ @Override
+ public byte[] getBinary(int pos, int length) {
+ return paimonRow.getBinary(pos);
+ }
+
+ @Override
+ public byte[] getBytes(int pos) {
+ return paimonRow.getBinary(pos);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 609bf68ac..3231fe644 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -27,7 +27,8 @@ import org.junit.jupiter.api.BeforeEach;
import static com.alibaba.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
-class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase {
+/** Base class for Flink union read test. */
+public class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase {
protected static final int DEFAULT_BUCKET_NUM = 1;
StreamTableEnvironment batchTEnv;
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java
new file mode 100644
index 000000000..b20e1a5f2
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static com.alibaba.fluss.record.ChangeType.DELETE;
+import static com.alibaba.fluss.record.ChangeType.INSERT;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link FlussRowAsPaimonRow}. */
+class FlussRowAsPaimonRowTest {
+ @Test
+ void testLogTableRecordAllTypes() {
+ // Construct a FlussRowAsPaimonRow instance
+ RowType tableRowType =
+ RowType.of(
+ new org.apache.paimon.types.BooleanType(),
+ new org.apache.paimon.types.TinyIntType(),
+ new org.apache.paimon.types.SmallIntType(),
+ new org.apache.paimon.types.IntType(),
+ new org.apache.paimon.types.BigIntType(),
+ new org.apache.paimon.types.FloatType(),
+ new org.apache.paimon.types.DoubleType(),
+ new org.apache.paimon.types.VarCharType(),
+ new org.apache.paimon.types.DecimalType(5, 2),
+ new org.apache.paimon.types.DecimalType(20, 0),
+ new org.apache.paimon.types.LocalZonedTimestampType(6),
+ new org.apache.paimon.types.TimestampType(6),
+ new org.apache.paimon.types.BinaryType(),
+ new org.apache.paimon.types.VarCharType());
+
+ long logOffset = 0;
+ long timeStamp = System.currentTimeMillis();
+ GenericRow genericRow = new GenericRow(14);
+ genericRow.setField(0, true);
+ genericRow.setField(1, (byte) 1);
+ genericRow.setField(2, (short) 2);
+ genericRow.setField(3, 3);
+ genericRow.setField(4, 4L);
+ genericRow.setField(5, 5.1f);
+ genericRow.setField(6, 6.0d);
+ genericRow.setField(7, BinaryString.fromString("string"));
+ genericRow.setField(8, Decimal.fromUnscaledLong(9, 5, 2));
+ genericRow.setField(9, Decimal.fromBigDecimal(new BigDecimal(10), 20,
0));
+ genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L,
5678));
+ genericRow.setField(11, TimestampNtz.fromMillis(1698235273182L, 5678));
+ genericRow.setField(12, new byte[] {1, 2, 3, 4});
+ genericRow.setField(13, null);
+ LogRecord logRecord = new GenericRecord(logOffset, timeStamp,
APPEND_ONLY, genericRow);
+ FlussRowAsPaimonRow flussRowAsPaimonRow =
+ new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
+
+ // verify FlussRecordAsPaimonRow normal columns
+ assertThat(flussRowAsPaimonRow.getBoolean(0)).isTrue();
+ assertThat(flussRowAsPaimonRow.getByte(1)).isEqualTo((byte) 1);
+ assertThat(flussRowAsPaimonRow.getShort(2)).isEqualTo((short) 2);
+ assertThat(flussRowAsPaimonRow.getInt(3)).isEqualTo(3);
+ assertThat(flussRowAsPaimonRow.getLong(4)).isEqualTo(4L);
+ assertThat(flussRowAsPaimonRow.getFloat(5)).isEqualTo(5.1f);
+ assertThat(flussRowAsPaimonRow.getDouble(6)).isEqualTo(6.0d);
+
assertThat(flussRowAsPaimonRow.getString(7).toString()).isEqualTo("string");
+ assertThat(flussRowAsPaimonRow.getDecimal(8, 5, 2).toBigDecimal())
+ .isEqualTo(new BigDecimal("0.09"));
+ assertThat(flussRowAsPaimonRow.getDecimal(9, 20, 0).toBigDecimal())
+ .isEqualTo(new BigDecimal(10));
+ assertThat(flussRowAsPaimonRow.getTimestamp(10, 6).getMillisecond())
+ .isEqualTo(1698235273182L);
+ assertThat(flussRowAsPaimonRow.getTimestamp(10,
6).getNanoOfMillisecond()).isEqualTo(5678);
+ assertThat(flussRowAsPaimonRow.getTimestamp(11, 6).getMillisecond())
+ .isEqualTo(1698235273182L);
+ assertThat(flussRowAsPaimonRow.getTimestamp(11,
6).getNanoOfMillisecond()).isEqualTo(5678);
+ assertThat(flussRowAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1,
2, 3, 4});
+ assertThat(flussRowAsPaimonRow.isNullAt(13)).isTrue();
+ }
+
+ @Test
+ void testPrimaryKeyTableRecord() {
+ RowType tableRowType =
+ RowType.of(
+ new org.apache.paimon.types.IntType(),
+ new org.apache.paimon.types.BooleanType());
+
+ long logOffset = 0;
+ long timeStamp = System.currentTimeMillis();
+ GenericRow genericRow = new GenericRow(2);
+ genericRow.setField(0, 10);
+ genericRow.setField(1, true);
+
+ LogRecord logRecord = new GenericRecord(logOffset, timeStamp, INSERT,
genericRow);
+ FlussRowAsPaimonRow flussRowAsPaimonRow =
+ new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
+
+ assertThat(flussRowAsPaimonRow.getInt(0)).isEqualTo(10);
+ // verify rowkind
+ assertThat(flussRowAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT);
+
+ logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_BEFORE,
genericRow);
+ assertThat(new FlussRowAsPaimonRow(logRecord.getRow(),
tableRowType).getRowKind())
+ .isEqualTo(RowKind.INSERT);
+
+ logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_AFTER,
genericRow);
+ assertThat(new FlussRowAsPaimonRow(logRecord.getRow(),
tableRowType).getRowKind())
+ .isEqualTo(RowKind.INSERT);
+
+ logRecord = new GenericRecord(logOffset, timeStamp, DELETE,
genericRow);
+ assertThat(new FlussRowAsPaimonRow(logRecord.getRow(),
tableRowType).getRowKind())
+ .isEqualTo(RowKind.INSERT);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
new file mode 100644
index 000000000..eb9b7a1ca
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonRecordReader}. */
+class PaimonRecordReaderTest extends PaimonSourceTestBase {
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonSourceTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadLogTable(boolean isPartitioned) throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ prepareLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM,
writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Split> splits = ((FileStoreTable)
table).newScan().plan().splits();
+ assertThat(splits).hasSize(paimonSplits.size());
+ assertThat(splits)
+ .isEqualTo(
+ paimonSplits.stream()
+ .map(PaimonSplit::dataSplit)
+ .collect(Collectors.toList()));
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(getFlussRowType(isPartitioned));
+ for (PaimonSplit paimonSplit : paimonSplits) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ actual.addAll(
+ convertToFlinkRow(
+ fieldGetters,
+ TransformingCloseableIterator.transform(iterator,
LogRecord::getRow)));
+ iterator.close();
+ }
+ List<Row> expectRows =
+ convertToFlinkRow(fieldGetters,
CloseableIterator.wrap(writtenRows.iterator()));
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+ }
+
+ @Test
+ void testReadLogTableWithProject() throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "logTable_non_partitioned";
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ prepareLogTable(tablePath, false, DEFAULT_BUCKET_NUM, writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ lakeSource.withProject(new int[][] {new int[] {5}, new int[] {1}, new
int[] {3}});
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Row> actual = new ArrayList<>();
+
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(
+ RowType.of(new FloatType(), new TinyIntType(), new
IntType()));
+ for (PaimonSplit paimonSplit : paimonSplits) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ actual.addAll(
+ convertToFlinkRow(
+ fieldGetters,
+ TransformingCloseableIterator.transform(iterator,
LogRecord::getRow)));
+ iterator.close();
+ }
+ List<Row> expectRows = new ArrayList<>();
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {5, 1, 3});
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ try
(org.apache.paimon.reader.RecordReader<org.apache.paimon.data.InternalRow>
+ recordReader = readBuilder.newRead().createReader(splits)) {
+
org.apache.paimon.utils.CloseableIterator<org.apache.paimon.data.InternalRow>
+ closeableIterator = recordReader.toCloseableIterator();
+ PaimonRowAsFlussRow paimonRowAsFlussRow = new
PaimonRowAsFlussRow();
+ expectRows.addAll(
+ convertToFlinkRow(
+ fieldGetters,
+ TransformingCloseableIterator.transform(
+ CloseableIterator.wrap(closeableIterator),
+ paimonRowAsFlussRow::replaceRow)));
+ }
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+ }
+
+ private RowType getFlussRowType(boolean isPartitioned) {
+ return isPartitioned
+ ? RowType.of(
+ new BooleanType(),
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new FloatType(),
+ new DoubleType(),
+ new StringType(),
+ new DecimalType(5, 2),
+ new DecimalType(20, 0),
+ new LocalZonedTimestampType(3),
+ new LocalZonedTimestampType(6),
+ new TimestampType(3),
+ new TimestampType(6),
+ new BinaryType(4),
+ new StringType())
+ : RowType.of(
+ new BooleanType(),
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new FloatType(),
+ new DoubleType(),
+ new StringType(),
+ new DecimalType(5, 2),
+ new DecimalType(20, 0),
+ new LocalZonedTimestampType(3),
+ new LocalZonedTimestampType(6),
+ new TimestampType(3),
+ new TimestampType(6),
+ new BinaryType(4));
+ }
+
+ private void prepareLogTable(
+ TablePath tablePath, boolean isPartitioned, int bucketNum,
List<InternalRow> rows)
+ throws Exception {
+ createFullTypeLogTable(tablePath, isPartitioned, bucketNum);
+ rows.addAll(writeFullTypeRows(tablePath, 10, isPartitioned ? "test" :
null));
+ }
+
+ private void createFullTypeLogTable(TablePath tablePath, boolean
isPartitioned, int bucketNum)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("f_boolean", DataTypes.BOOLEAN())
+ .column("f_byte", DataTypes.TINYINT())
+ .column("f_short", DataTypes.SMALLINT())
+ .column("f_int", DataTypes.INT())
+ .column("f_long", DataTypes.BIGINT())
+ .column("f_float", DataTypes.FLOAT())
+ .column("f_double", DataTypes.DOUBLE())
+ .column("f_string", DataTypes.STRING())
+ .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+ .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+ .column("f_timestamp_ltz1",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+ .column("f_timestamp_ltz2",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+ .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+ .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+ .column("f_binary", DataTypes.BINARY(4));
+
+ if (isPartitioned) {
+ schemaBuilder.column("p", DataTypes.STRING());
+ schemaBuilder.partitionKeys("p");
+ schemaBuilder.option(CoreOptions.BUCKET.key(),
String.valueOf(bucketNum));
+ schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_long");
+ }
+ schemaBuilder
+ .column("__bucket", DataTypes.INT())
+ .column("__offset", DataTypes.BIGINT())
+ .column("__timestamp", DataTypes.TIMESTAMP(6));
+ createTable(tablePath, schemaBuilder.build());
+ }
+
+ private List<InternalRow> writeFullTypeRows(
+ TablePath tablePath, int rowCount, @Nullable String partition)
throws Exception {
+ List<org.apache.paimon.data.InternalRow> rows = new ArrayList<>();
+ List<InternalRow> flussRows = new ArrayList<>();
+ Table table = getTable(tablePath);
+
+ for (int i = 0; i < rowCount; i++) {
+ if (partition == null) {
+ com.alibaba.fluss.row.GenericRow row =
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ i + 400L,
+ 500.1f,
+ 600.0d,
+ com.alibaba.fluss.row.BinaryString.fromString(
+ "another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L,
7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ 0,
+ (long) i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()));
+ rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+ flussRows.add(row);
+ } else {
+ com.alibaba.fluss.row.GenericRow row =
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ i + 400L,
+ 500.1f,
+ 600.0d,
+ com.alibaba.fluss.row.BinaryString.fromString(
+ "another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L,
7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+
com.alibaba.fluss.row.BinaryString.fromString(partition),
+ 0,
+ (long) i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()));
+ rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+ flussRows.add(row);
+ }
+ }
+ writeRecord(tablePath, rows);
+ return flussRows;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
new file mode 100644
index 000000000..6efc8706c
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSortedRecordReader}. */
+class PaimonSortedRecordReaderTest extends PaimonSourceTestBase {
+ @BeforeAll
+ protected static void beforeAll() {
+ PaimonSourceTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testReadPkTable(boolean isPartitioned) throws Exception {
+ // first of all, create table and prepare data
+ String tableName = "pkTable_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+ List<InternalRow> writtenRows = new ArrayList<>();
+ preparePkTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM,
writtenRows);
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ Table table = getTable(tablePath);
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ for (PaimonSplit paimonSplit : paimonSplits) {
+ RecordReader recordReader = lakeSource.createRecordReader(() ->
paimonSplit);
+
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+ CloseableIterator<LogRecord> iterator = recordReader.read();
+ assertThat(
+ isSorted(
+ TransformingCloseableIterator.transform(
+ iterator, LogRecord::getRow),
+ ((SortedRecordReader)
recordReader).order()))
+ .isTrue();
+ iterator.close();
+ }
+ }
+
+ private static <T> boolean isSorted(Iterator<T> iterator, Comparator<?
super T> comparator) {
+ if (!iterator.hasNext()) {
+ return true;
+ }
+
+ T previous = iterator.next();
+ while (iterator.hasNext()) {
+ T current = iterator.next();
+ if (comparator.compare(previous, current) > 0) {
+ return false;
+ }
+ previous = current;
+ }
+ return true;
+ }
+
+ private void preparePkTable(
+ TablePath tablePath, boolean isPartitioned, int bucketNum,
List<InternalRow> rows)
+ throws Exception {
+ createFullTypePkTable(tablePath, isPartitioned, bucketNum);
+ rows.addAll(writeFullTypeRows(tablePath, 10, isPartitioned ? "test" :
null));
+ }
+
+ private void createFullTypePkTable(TablePath tablePath, boolean
isPartitioned, int bucketNum)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("f_int", DataTypes.INT())
+ .column("f_boolean", DataTypes.BOOLEAN())
+ .column("f_byte", DataTypes.TINYINT())
+ .column("f_short", DataTypes.SMALLINT())
+ .column("f_long", DataTypes.BIGINT())
+ .column("f_float", DataTypes.FLOAT())
+ .column("f_double", DataTypes.DOUBLE())
+ .column("f_string", DataTypes.STRING())
+ .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+ .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+ .column("f_timestamp_ltz1",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+ .column("f_timestamp_ltz2",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+ .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+ .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+ .column("f_binary", DataTypes.BINARY(4));
+
+ if (isPartitioned) {
+ schemaBuilder.column("p", DataTypes.STRING());
+ schemaBuilder.partitionKeys("p");
+ schemaBuilder.primaryKey("f_int", "p");
+ schemaBuilder.option(CoreOptions.BUCKET.key(),
String.valueOf(bucketNum));
+ schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_int");
+ } else {
+ schemaBuilder.primaryKey("f_int");
+ schemaBuilder.option(CoreOptions.BUCKET.key(),
String.valueOf(bucketNum));
+ schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_int");
+ }
+ schemaBuilder
+ .column("__bucket", DataTypes.INT())
+ .column("__offset", DataTypes.BIGINT())
+ .column("__timestamp", DataTypes.TIMESTAMP(6));
+ createTable(tablePath, schemaBuilder.build());
+ }
+
+ private List<InternalRow> writeFullTypeRows(
+ TablePath tablePath, int rowCount, @Nullable String partition)
throws Exception {
+ List<org.apache.paimon.data.InternalRow> rows = new ArrayList<>();
+ List<InternalRow> flussRows = new ArrayList<>();
+ Table table = getTable(tablePath);
+
+ for (int i = 0; i < rowCount; i++) {
+ if (partition == null) {
+ com.alibaba.fluss.row.GenericRow row =
+ row(
+ i + 30,
+ true,
+ (byte) 100,
+ (short) 200,
+ 400L,
+ 500.1f,
+ 600.0d,
+ com.alibaba.fluss.row.BinaryString.fromString(
+ "another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L,
7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ 0,
+ (long) i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()));
+ rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+ flussRows.add(row);
+ } else {
+ com.alibaba.fluss.row.GenericRow row =
+ row(
+ i + 30,
+ true,
+ (byte) 100,
+ (short) 200,
+ 400L,
+ 500.1f,
+ 600.0d,
+ com.alibaba.fluss.row.BinaryString.fromString(
+ "another_string_" + i),
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L,
7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+
com.alibaba.fluss.row.BinaryString.fromString(partition),
+ 0,
+ (long) i,
+
TimestampNtz.fromMillis(System.currentTimeMillis()));
+ rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+ flussRows.add(row);
+ }
+ }
+ writeRecord(tablePath, rows);
+ return flussRows;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
new file mode 100644
index 000000000..93c73d329
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.paimon.PaimonLakeStorage;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Base class for paimon lakehouse test. */
+class PaimonSourceTestBase {
+ protected static final String DEFAULT_DB = "fluss_lakehouse";
+ protected static final String DEFAULT_TABLE = "test_lakehouse_table";
+ protected static final int DEFAULT_BUCKET_NUM = 1;
+
+ private static @TempDir File tempWarehouseDir;
+ protected static PaimonLakeStorage lakeStorage;
+ protected static Catalog paimonCatalog;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ Configuration configuration = new Configuration();
+ configuration.setString("type", "paimon");
+ configuration.setString("warehouse", tempWarehouseDir.toString());
+ lakeStorage = new PaimonLakeStorage(configuration);
+ paimonCatalog =
+ CatalogFactory.createCatalog(
+
CatalogContext.create(Options.fromMap(configuration.toMap())));
+ }
+
+ public void createTable(TablePath tablePath, Schema schema) throws
Exception {
+ paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
+ paimonCatalog.createTable(toPaimon(tablePath), schema, true);
+ }
+
+ public void writeRecord(TablePath tablePath, List<InternalRow> records)
throws Exception {
+ Table table = getTable(tablePath);
+ BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder().withOverwrite();
+ try (BatchTableWrite writer = writeBuilder.newWrite()) {
+ for (InternalRow record : records) {
+ writer.write(record);
+ }
+ List<CommitMessage> messages = writer.prepareCommit();
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(messages);
+ }
+ }
+ }
+
+ public Table getTable(TablePath tablePath) throws Exception {
+ return paimonCatalog.getTable(toPaimon(tablePath));
+ }
+
+ public static List<Row> convertToFlinkRow(
+ com.alibaba.fluss.row.InternalRow.FieldGetter[] fieldGetters,
+ CloseableIterator<com.alibaba.fluss.row.InternalRow>
flussRowIterator) {
+ List<Row> rows = new ArrayList<>();
+ while (flussRowIterator.hasNext()) {
+ com.alibaba.fluss.row.InternalRow row = flussRowIterator.next();
+ Row flinkRow = new Row(fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ flinkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ rows.add(flinkRow);
+ }
+ return rows;
+ }
+
+ /** Adapter for transforming closeable iterator. */
+ public static class TransformingCloseableIterator<T, U> implements
CloseableIterator<U> {
+ private final CloseableIterator<T> source;
+ private final Function<? super T, ? extends U> transformer;
+
+ public TransformingCloseableIterator(
+ CloseableIterator<T> source, Function<? super T, ? extends U>
transformer) {
+ this.source = source;
+ this.transformer = transformer;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return source.hasNext();
+ }
+
+ @Override
+ public U next() {
+ return transformer.apply(source.next());
+ }
+
+ @Override
+ public void close() {
+ source.close();
+ }
+
+ public static <T, U> CloseableIterator<U> transform(
+ CloseableIterator<T> source, Function<? super T, ? extends U>
transformer) {
+ return new TransformingCloseableIterator<>(source, transformer);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
new file mode 100644
index 000000000..0d9e509fe
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplitPlanner}. */
+class PaimonSplitPlannerTest extends PaimonSourceTestBase {
+ @Test
+ void testPlan() throws Exception {
+ // prepare paimon table
+ int bucketNum = 2;
+ TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .column("c3", DataTypes.STRING());
+ builder.partitionKeys("c3");
+ builder.primaryKey("c1", "c3");
+ builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+ builder.option(CoreOptions.BUCKET_KEY.key(), "c1");
+ createTable(tablePath, builder.build());
+ Table table =
+ paimonCatalog.getTable(
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+
+ GenericRow record1 =
+ GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
+ GenericRow record2 =
+ GenericRow.of(13, BinaryString.fromString("a"),
BinaryString.fromString("A"));
+ writeRecord(tablePath, Arrays.asList(record1, record2));
+ Snapshot snapshot = table.latestSnapshot().get();
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ List<Split> actualSplits = ((FileStoreTable)
table).newScan().plan().splits();
+
+ assertThat(actualSplits).hasSize(paimonSplits.size());
+ assertThat(actualSplits)
+ .isEqualTo(
+ paimonSplits.stream()
+ .map(PaimonSplit::dataSplit)
+ .collect(Collectors.toList()));
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
new file mode 100644
index 000000000..9f07509be
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test case for {@link PaimonSplitSerializer}. */
+class PaimonSplitSerializerTest extends PaimonSourceTestBase {
+ private final PaimonSplitSerializer serializer = new
PaimonSplitSerializer();
+
+ @Test
+ void testSerializeAndDeserialize() throws Exception {
+ // prepare paimon table
+ int bucketNum = 1;
+ TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .column("c3", DataTypes.STRING());
+ builder.partitionKeys("c3");
+ builder.primaryKey("c1", "c3");
+ builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+ createTable(tablePath, builder.build());
+ Table table =
+ paimonCatalog.getTable(
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+
+ GenericRow record1 =
+ GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
+ writeRecord(tablePath, Arrays.asList(record1));
+ Snapshot snapshot = table.latestSnapshot().get();
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ List<PaimonSplit> plan = lakeSource.createPlanner(snapshot::id).plan();
+
+ PaimonSplit originalPaimonSplit = plan.get(0);
+ byte[] serialized = serializer.serialize(originalPaimonSplit);
+ PaimonSplit deserialized =
serializer.deserialize(serializer.getVersion(), serialized);
+
+
assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit());
+ }
+
+ @Test
+ void testDeserializeWithInvalidData() {
+ byte[] invalidData = "invalid".getBytes();
+ assertThatThrownBy(() -> serializer.deserialize(1, invalidData))
+ .isInstanceOf(IOException.class);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
new file mode 100644
index 000000000..b0139d1d8
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplit}. */
+class PaimonSplitTest extends PaimonSourceTestBase {
+
+ @Test
+ void testPaimonSplit() throws Exception {
+ // prepare paimon table
+ int bucketNum = 1;
+ TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .column("c3", DataTypes.STRING());
+ builder.partitionKeys("c3");
+ builder.primaryKey("c1", "c3");
+ builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+ createTable(tablePath, builder.build());
+ Table table =
+ paimonCatalog.getTable(
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName()));
+
+ GenericRow record1 =
+ GenericRow.of(12, BinaryString.fromString("a"),
BinaryString.fromString("A"));
+ writeRecord(tablePath, Collections.singletonList(record1));
+ Snapshot snapshot = table.latestSnapshot().get();
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ List<PaimonSplit> paimonSplits =
lakeSource.createPlanner(snapshot::id).plan();
+
+ // test bucket() and partition() method
+ PaimonSplit paimonSplit = paimonSplits.get(0);
+
assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("A"));
+
+ List<Split> actualSplits = ((FileStoreTable)
table).newScan().plan().splits();
+ assertThat(actualSplits.size()).isEqualTo(paimonSplits.size());
+ Split actualSplit = actualSplits.get(0);
+ assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit());
+ assertThat(((DataSplit)
actualSplit).bucket()).isEqualTo(paimonSplit.bucket());
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index d9c0bebd5..8abb04d35 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -91,7 +91,7 @@ public class FlinkPaimonTieringTestBase {
protected static Connection conn;
protected static Admin admin;
protected static Configuration clientConf;
- private static String warehousePath;
+ protected static String warehousePath;
protected static Catalog paimonCatalog;
private static Configuration initConfig() {