This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2db2b8fa13 [core] Code Refactor for Data Evolution
2db2b8fa13 is described below
commit 2db2b8fa136c806ae716a60f723630613093c15e
Author: JingsongLi <[email protected]>
AuthorDate: Sat Aug 9 10:56:18 2025 +0800
[core] Code Refactor for Data Evolution
---
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../java/org/apache/paimon/schema/TableSchema.java | 16 +--
...ileReader.java => DataEvolutionFileReader.java} | 33 +++---
.../org/apache/paimon/AppendOnlyFileStore.java | 8 +-
.../org/apache/paimon/append/AppendOnlyWriter.java | 8 +-
.../paimon/operation/BaseAppendFileStoreWrite.java | 13 +-
...eSplitRead.java => DataEvolutionSplitRead.java} | 12 +-
.../org/apache/paimon/schema/SchemaValidation.java | 2 +-
.../paimon/table/AppendOnlyFileStoreTable.java | 25 +++-
.../apache/paimon/table/sink/BatchTableWrite.java | 5 +-
.../paimon/table/source/AppendTableRead.java | 36 ++----
.../paimon/table/source/KeyValueTableRead.java | 16 +--
.../AppendTableRawFileSplitReadProvider.java | 7 +-
...er.java => DataEvolutionSplitReadProvider.java} | 54 +++++----
.../IncrementalChangelogReadProvider.java | 15 +--
.../splitread/IncrementalDiffReadProvider.java | 15 +--
.../splitread/MergeFieldSplitReadProvider.java | 78 ------------
.../splitread/MergeFileSplitReadProvider.java | 15 +--
.../PrimaryKeyTableRawFileSplitReadProvider.java | 7 +-
.../source/splitread/RawFileSplitReadProvider.java | 16 +--
...SplitReadProvider.java => SplitReadConfig.java} | 11 +-
.../table/source/splitread/SplitReadProvider.java | 5 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 2 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 2 +-
...eFieldTest.java => DataEvolutionTableTest.java} | 13 +-
.../DataEvolutionSplitReadProviderTest.java | 132 +++++++++++++++++++++
.../AppendPreCommitCompactWorkerOperator.java | 2 +-
27 files changed, 288 insertions(+), 262 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index a464e6dba6..32eff542a4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2901,7 +2901,7 @@ public class CoreOptions implements Serializable {
return options.get(ROW_TRACKING_ENABLED);
}
- public boolean dataElolutionEnabled() {
+ public boolean dataEvolutionEnabled() {
return options.get(DATA_EVOLUTION_ENABLED);
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
index 0680a65675..6e012c016b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -274,23 +274,11 @@ public class TableSchema implements Serializable {
if (writeCols == null || writeCols.isEmpty()) {
return this;
}
- Map<String, DataField> fieldMap =
- fields.stream()
- .collect(Collectors.toMap(DataField::name, field ->
field, (a, b) -> a));
- List<DataField> fields = new ArrayList<>();
- for (String fieldId : writeCols) {
- DataField dataField = fieldMap.get(fieldId);
- if (dataField == null) {
- throw new RuntimeException(
- String.format(
- "Projecting field %s, but not found in schema
%s.", fieldId, this));
- }
- fields.add(dataField);
- }
+
return new TableSchema(
version,
id,
- fields,
+ new RowType(fields).project(writeCols).getFields(),
highestFieldId,
partitionKeys,
primaryKeys,
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
similarity index 89%
rename from
paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java
rename to
paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
index 65748f5161..9c75f0e114 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
@@ -59,13 +59,13 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*
* </pre>
*/
-public class CompoundFileReader implements RecordReader<InternalRow> {
+public class DataEvolutionFileReader implements RecordReader<InternalRow> {
private final int[] rowOffsets;
private final int[] fieldOffsets;
private final RecordReader<InternalRow>[] innerReaders;
- public CompoundFileReader(
+ public DataEvolutionFileReader(
int[] rowOffsets, int[] fieldOffsets, RecordReader<InternalRow>[]
readers) {
checkArgument(rowOffsets != null, "Row offsets must not be null");
checkArgument(fieldOffsets != null, "Field offsets must not be null");
@@ -82,17 +82,17 @@ public class CompoundFileReader implements
RecordReader<InternalRow> {
@Override
@Nullable
public RecordIterator<InternalRow> readBatch() throws IOException {
- CompoundRecordIterator compundFileRecordIterator =
- new CompoundRecordIterator(innerReaders.length, rowOffsets,
fieldOffsets);
+ DataEvolutionIterator iterator =
+ new DataEvolutionIterator(innerReaders.length, rowOffsets,
fieldOffsets);
for (int i = 0; i < innerReaders.length; i++) {
RecordIterator<InternalRow> batch = innerReaders[i].readBatch();
if (batch == null && !(innerReaders[i] instanceof
EmptyFileRecordReader)) {
return null;
}
- compundFileRecordIterator.compound(i, batch);
+ iterator.set(i, batch);
}
- return compundFileRecordIterator;
+ return iterator;
}
@Override
@@ -105,20 +105,21 @@ public class CompoundFileReader implements
RecordReader<InternalRow> {
}
/** The batch which is made up by several batches. */
- public static class CompoundRecordIterator implements
RecordIterator<InternalRow> {
+ private static class DataEvolutionIterator implements
RecordIterator<InternalRow> {
- private final CompundInternalRow compundInternalRow;
+ private final DataEvolutionRow dataEvolutionRow;
private final RecordIterator<InternalRow>[] iterators;
- public CompoundRecordIterator(
+ private DataEvolutionIterator(
int rowNumber,
int[] rowOffsets,
int[] fieldOffsets) { // Initialize with empty arrays, will be
set later
- this.compundInternalRow = new CompundInternalRow(rowNumber,
rowOffsets, fieldOffsets);
+ this.dataEvolutionRow = new DataEvolutionRow(rowNumber,
rowOffsets, fieldOffsets);
+ //noinspection unchecked
this.iterators = new RecordIterator[rowNumber];
}
- public void compound(int i, RecordIterator<InternalRow> iterator) {
+ public void set(int i, RecordIterator<InternalRow> iterator) {
iterators[i] = iterator;
}
@@ -131,10 +132,10 @@ public class CompoundFileReader implements
RecordReader<InternalRow> {
if (next == null) {
return null;
}
- compundInternalRow.setRow(i, next);
+ dataEvolutionRow.setRow(i, next);
}
}
- return compundInternalRow;
+ return dataEvolutionRow;
}
@Override
@@ -148,19 +149,19 @@ public class CompoundFileReader implements
RecordReader<InternalRow> {
}
/** The row which is made up by several rows. */
- public static class CompundInternalRow implements InternalRow {
+ private static class DataEvolutionRow implements InternalRow {
private final InternalRow[] rows;
private final int[] rowOffsets;
private final int[] fieldOffsets;
- public CompundInternalRow(int rowNumber, int[] rowOffsets, int[]
fieldOffsets) {
+ private DataEvolutionRow(int rowNumber, int[] rowOffsets, int[]
fieldOffsets) {
this.rows = new InternalRow[rowNumber];
this.rowOffsets = rowOffsets;
this.fieldOffsets = fieldOffsets;
}
- public void setRow(int pos, InternalRow row) {
+ private void setRow(int pos, InternalRow row) {
if (pos >= rows.length) {
throw new IndexOutOfBoundsException(
"Position " + pos + " is out of bounds for rows size "
+ rows.length);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 7f0a6b5f19..be3d1662c1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -27,7 +27,7 @@ import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.operation.BucketSelectConverter;
import org.apache.paimon.operation.BucketedAppendFileStoreWrite;
-import org.apache.paimon.operation.FieldMergeSplitRead;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
@@ -86,12 +86,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
options.rowTrackingEnabled());
}
- public FieldMergeSplitRead newFieldMergeRead() {
- if (!options.dataElolutionEnabled()) {
+ public DataEvolutionSplitRead newDataEvolutionRead() {
+ if (!options.dataEvolutionEnabled()) {
throw new IllegalStateException(
"Field merge read is only supported when
data-evolution.enabled is true.");
}
- return new FieldMergeSplitRead(
+ return new DataEvolutionSplitRead(
fileIO,
schemaManager,
schema,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 646ef382a4..f8a074cab8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -69,8 +69,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final long schemaId;
private final FileFormat fileFormat;
private final long targetFileSize;
- private final RowType rowType;
private final RowType writeSchema;
+ @Nullable private final List<String> writeCols;
private final DataFilePathFactory pathFactory;
private final CompactManager compactManager;
private final IOFunction<List<DataFileMeta>,
RecordReaderIterator<InternalRow>> dataFileRead;
@@ -99,8 +99,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
long schemaId,
FileFormat fileFormat,
long targetFileSize,
- RowType rowType,
RowType writeSchema,
+ @Nullable List<String> writeCols,
long maxSequenceNumber,
CompactManager compactManager,
IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>>
dataFileRead,
@@ -120,8 +120,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.schemaId = schemaId;
this.fileFormat = fileFormat;
this.targetFileSize = targetFileSize;
- this.rowType = rowType;
this.writeSchema = writeSchema;
+ this.writeCols = writeCols;
this.pathFactory = pathFactory;
this.compactManager = compactManager;
this.dataFileRead = dataFileRead;
@@ -304,7 +304,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
FileSource.APPEND,
asyncFileWrite,
statsDenseStore,
- writeSchema.equals(rowType) ? null :
writeSchema.getFieldNames());
+ writeCols);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index eb2f1f669c..dca05a6522 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -75,6 +75,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
private final RowType rowType;
private RowType writeType;
+ private @Nullable List<String> writeCols;
private boolean forceBufferSpill = false;
public BaseAppendFileStoreWrite(
@@ -95,6 +96,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
this.schemaId = schemaId;
this.rowType = rowType;
this.writeType = rowType;
+ this.writeCols = null;
this.fileFormat = fileFormat(options);
this.pathFactory = pathFactory;
@@ -116,8 +118,8 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
schemaId,
fileFormat,
options.targetFileSize(false),
- rowType,
writeType,
+ writeCols,
restoredMaxSeqNumber,
getCompactManager(partition, bucket, restoredFiles,
compactExecutor, dvMaintainer),
// it is only for new files, no dv
@@ -139,6 +141,15 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
@Override
public void withWriteType(RowType writeType) {
this.writeType = writeType;
+ int fullCount = rowType.getFieldCount();
+ List<String> fullNames = rowType.getFieldNames();
+ this.writeCols = writeType.getFieldNames();
+ // optimize writeCols to null in following cases:
+ // 1. writeType contains all columns
+ // 2. writeType contains all columns and append _ROW_ID cols
+ if (writeCols.size() >= fullCount && writeCols.subList(0,
fullCount).equals(fullNames)) {
+ writeCols = null;
+ }
}
private SimpleColStatsCollector.Factory[] statsCollectors() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
similarity index 96%
rename from
paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java
rename to
paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 94b0d339ec..af1e5b767f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -27,7 +27,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.reader.CompoundFileReader;
+import org.apache.paimon.reader.DataEvolutionFileReader;
import org.apache.paimon.reader.EmptyFileRecordReader;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
@@ -56,9 +56,9 @@ import static java.lang.String.format;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
-public class FieldMergeSplitRead extends RawFileSplitRead {
+public class DataEvolutionSplitRead extends RawFileSplitRead {
- public FieldMergeSplitRead(
+ public DataEvolutionSplitRead(
FileIO fileIO,
SchemaManager schemaManager,
TableSchema schema,
@@ -106,7 +106,7 @@ public class FieldMergeSplitRead extends RawFileSplitRead {
} else {
suppliers.add(
() ->
- createCompoundFileReader(
+ createFileReader(
needMergeFiles,
partition,
dataFilePathFactory,
@@ -118,7 +118,7 @@ public class FieldMergeSplitRead extends RawFileSplitRead {
return ConcatRecordReader.create(suppliers);
}
- private CompoundFileReader createCompoundFileReader(
+ private DataEvolutionFileReader createFileReader(
List<DataFileMeta> needMergeFiles,
BinaryRow partition,
DataFilePathFactory dataFilePathFactory,
@@ -222,6 +222,6 @@ public class FieldMergeSplitRead extends RawFileSplitRead {
allReadFields.get(i)));
}
}
- return new CompoundFileReader(rowOffsets, fieldOffsets,
fileRecordReaders);
+ return new DataEvolutionFileReader(rowOffsets, fieldOffsets,
fileRecordReaders);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 82e62896df..06d7d4117f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -650,7 +650,7 @@ public class SchemaValidation {
}
private static void validateDataEvolution(CoreOptions options) {
- if (options.dataElolutionEnabled()) {
+ if (options.dataEvolutionEnabled()) {
checkArgument(
options.rowTrackingEnabled(),
"Data evolution config must enabled with
row-tracking.enabled");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 9f59da690e..367eeb714b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -35,11 +35,18 @@ import org.apache.paimon.table.source.AppendTableRead;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.SplitGenerator;
+import
org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider;
+import org.apache.paimon.table.source.splitread.DataEvolutionSplitReadProvider;
+import org.apache.paimon.table.source.splitread.SplitReadConfig;
+import org.apache.paimon.table.source.splitread.SplitReadProvider;
import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.BiConsumer;
+import java.util.function.Function;
/** {@link FileStoreTable} for append table. */
public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
@@ -82,7 +89,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
protected SplitGenerator splitGenerator() {
long targetSplitSize = store().options().splitTargetSize();
long openFileCost = store().options().splitOpenFileCost();
- return coreOptions().dataElolutionEnabled()
+ return coreOptions().dataEvolutionEnabled()
? new DataEvolutionSplitGenerator(targetSplitSize,
openFileCost)
: new AppendOnlySplitGenerator(targetSplitSize, openFileCost,
bucketMode());
}
@@ -99,11 +106,17 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public InnerTableRead newRead() {
- return new AppendTableRead(
- () -> store().newRead(),
- () -> store().newFieldMergeRead(),
- schema(),
- coreOptions());
+ List<Function<SplitReadConfig, SplitReadProvider>> providerFactories =
new ArrayList<>();
+ if (coreOptions().dataEvolutionEnabled()) {
+ // add data evolution first
+ providerFactories.add(
+ config ->
+ new DataEvolutionSplitReadProvider(
+ () -> store().newDataEvolutionRead(),
config));
+ }
+ providerFactories.add(
+ config -> new AppendTableRawFileSplitReadProvider(() ->
store().newRead(), config));
+ return new AppendTableRead(providerFactories, schema());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
index d8616d27f5..416bb58a30 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java
@@ -38,6 +38,9 @@ public interface BatchTableWrite extends TableWrite {
*/
List<CommitMessage> prepareCommit() throws Exception;
- /** Specified the write rowType. */
+ /**
+ * Specified the writing rowType, currently only work for table without
primary key and row
+ * tracking enabled.
+ */
BatchTableWrite withWriteType(RowType writeType);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 063f09e841..83e26301ea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -18,17 +18,13 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.FieldMergeSplitRead;
import org.apache.paimon.operation.MergeFileSplitRead;
-import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
-import
org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider;
-import org.apache.paimon.table.source.splitread.MergeFieldSplitReadProvider;
+import org.apache.paimon.table.source.splitread.SplitReadConfig;
import org.apache.paimon.table.source.splitread.SplitReadProvider;
import org.apache.paimon.types.RowType;
@@ -37,7 +33,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.Supplier;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* An abstraction layer above {@link MergeFileSplitRead} to provide reading of
{@link InternalRow}.
@@ -50,33 +47,26 @@ public final class AppendTableRead extends
AbstractDataTableRead {
private Predicate predicate = null;
public AppendTableRead(
- Supplier<RawFileSplitRead> batchRawReadSupplier,
- Supplier<FieldMergeSplitRead> fieldMergeSplitReadSupplier,
- TableSchema schema,
- CoreOptions coreOptions) {
+ List<Function<SplitReadConfig, SplitReadProvider>>
providerFactories,
+ TableSchema schema) {
super(schema);
- this.readProviders = new ArrayList<>();
- if (coreOptions.dataElolutionEnabled()) {
- // MergeFieldSplitReadProvider is used to read the field merge
split
- readProviders.add(
- new MergeFieldSplitReadProvider(
- fieldMergeSplitReadSupplier, this::assignValues));
- }
- readProviders.add(
- new AppendTableRawFileSplitReadProvider(batchRawReadSupplier,
this::assignValues));
+ this.readProviders =
+ providerFactories.stream()
+ .map(factory -> factory.apply(this::config))
+ .collect(Collectors.toList());
}
private List<SplitRead<InternalRow>> initialized() {
List<SplitRead<InternalRow>> readers = new ArrayList<>();
for (SplitReadProvider readProvider : readProviders) {
- if (readProvider.initialized()) {
- readers.add(readProvider.getOrCreate());
+ if (readProvider.get().initialized()) {
+ readers.add(readProvider.get().get());
}
}
return readers;
}
- private void assignValues(SplitRead<InternalRow> read) {
+ private void config(SplitRead<InternalRow> read) {
if (readType != null) {
read = read.withReadType(readType);
}
@@ -101,7 +91,7 @@ public final class AppendTableRead extends
AbstractDataTableRead {
DataSplit dataSplit = (DataSplit) split;
for (SplitReadProvider readProvider : readProviders) {
if (readProvider.match(dataSplit, false)) {
- return readProvider.getOrCreate().createReader(dataSplit);
+ return readProvider.get().get().createReader(dataSplit);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index 4f6c6515f7..c4bede61d8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -63,23 +63,23 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
this.readProviders =
Arrays.asList(
new PrimaryKeyTableRawFileSplitReadProvider(
- batchRawReadSupplier, this::assignValues),
- new MergeFileSplitReadProvider(mergeReadSupplier,
this::assignValues),
- new
IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues),
- new IncrementalDiffReadProvider(mergeReadSupplier,
this::assignValues));
+ batchRawReadSupplier, this::config),
+ new MergeFileSplitReadProvider(mergeReadSupplier,
this::config),
+ new
IncrementalChangelogReadProvider(mergeReadSupplier, this::config),
+ new IncrementalDiffReadProvider(mergeReadSupplier,
this::config));
}
private List<SplitRead<InternalRow>> initialized() {
List<SplitRead<InternalRow>> readers = new ArrayList<>();
for (SplitReadProvider readProvider : readProviders) {
- if (readProvider.initialized()) {
- readers.add(readProvider.getOrCreate());
+ if (readProvider.get().initialized()) {
+ readers.add(readProvider.get().get());
}
}
return readers;
}
- private void assignValues(SplitRead<InternalRow> read) {
+ private void config(SplitRead<InternalRow> read) {
if (forceKeepDelete) {
read = read.forceKeepDelete();
}
@@ -121,7 +121,7 @@ public final class KeyValueTableRead extends
AbstractDataTableRead {
DataSplit dataSplit = (DataSplit) split;
for (SplitReadProvider readProvider : readProviders) {
if (readProvider.match(dataSplit, forceKeepDelete)) {
- return readProvider.getOrCreate().createReader(dataSplit);
+ return readProvider.get().get().createReader(dataSplit);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
index 4dd85e839d..4c6bf86008 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java
@@ -18,20 +18,17 @@
package org.apache.paimon.table.source.splitread;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.table.source.DataSplit;
-import java.util.function.Consumer;
import java.util.function.Supplier;
/** Raw file split read for all append table. */
public class AppendTableRawFileSplitReadProvider extends
RawFileSplitReadProvider {
public AppendTableRawFileSplitReadProvider(
- Supplier<RawFileSplitRead> supplier,
Consumer<SplitRead<InternalRow>> valuesAssigner) {
- super(supplier, valuesAssigner);
+ Supplier<RawFileSplitRead> supplier, SplitReadConfig
splitReadConfig) {
+ super(supplier, splitReadConfig);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
similarity index 51%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
index a335a7c030..bcf2e9fa1b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java
@@ -18,48 +18,56 @@
package org.apache.paimon.table.source.splitread;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.operation.MergeFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.LazyField;
-import java.util.function.Consumer;
+import java.util.List;
import java.util.function.Supplier;
-/** A {@link SplitReadProvider} to batch incremental diff read. */
-public class IncrementalDiffReadProvider implements SplitReadProvider {
+/** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */
+public class DataEvolutionSplitReadProvider implements SplitReadProvider {
- private final LazyField<SplitRead<InternalRow>> splitRead;
+ private final LazyField<DataEvolutionSplitRead> splitRead;
- public IncrementalDiffReadProvider(
- Supplier<MergeFileSplitRead> supplier,
- Consumer<SplitRead<InternalRow>> valuesAssigner) {
+ public DataEvolutionSplitReadProvider(
+ Supplier<DataEvolutionSplitRead> supplier, SplitReadConfig
splitReadConfig) {
this.splitRead =
new LazyField<>(
() -> {
- SplitRead<InternalRow> read = create(supplier);
- valuesAssigner.accept(read);
+ DataEvolutionSplitRead read = supplier.get();
+ splitReadConfig.config(read);
return read;
});
}
- private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead>
supplier) {
- return new IncrementalDiffSplitRead(supplier.get());
- }
-
@Override
public boolean match(DataSplit split, boolean forceKeepDelete) {
- return !split.beforeFiles().isEmpty() && !split.isStreaming();
- }
+ List<DataFileMeta> files = split.dataFiles();
+ if (files.size() < 2) {
+ return false;
+ }
- @Override
- public boolean initialized() {
- return splitRead.initialized();
+ Long firstRowId = null;
+ for (DataFileMeta file : files) {
+ Long current = file.firstRowId();
+ if (current == null) {
+ return false;
+ }
+
+ if (firstRowId == null) {
+ firstRowId = current;
+ } else if (!firstRowId.equals(current)) {
+ return false;
+ }
+ }
+
+ return true;
}
@Override
- public SplitRead<InternalRow> getOrCreate() {
- return splitRead.get();
+ public LazyField<DataEvolutionSplitRead> get() {
+ return splitRead;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index eb41d02669..19d95001ba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -30,7 +30,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOFunction;
import org.apache.paimon.utils.LazyField;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
@@ -41,13 +40,12 @@ public class IncrementalChangelogReadProvider implements
SplitReadProvider {
private final LazyField<SplitRead<InternalRow>> splitRead;
public IncrementalChangelogReadProvider(
- Supplier<MergeFileSplitRead> supplier,
- Consumer<SplitRead<InternalRow>> valuesAssigner) {
+ Supplier<MergeFileSplitRead> supplier, SplitReadConfig
splitReadConfig) {
this.splitRead =
new LazyField<>(
() -> {
SplitRead<InternalRow> read = create(supplier);
- valuesAssigner.accept(read);
+ splitReadConfig.config(read);
return read;
});
}
@@ -86,12 +84,7 @@ public class IncrementalChangelogReadProvider implements
SplitReadProvider {
}
@Override
- public boolean initialized() {
- return splitRead.initialized();
- }
-
- @Override
- public SplitRead<InternalRow> getOrCreate() {
- return splitRead.get();
+ public LazyField<SplitRead<InternalRow>> get() {
+ return splitRead;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
index a335a7c030..6d45aeed7e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java
@@ -24,7 +24,6 @@ import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.LazyField;
-import java.util.function.Consumer;
import java.util.function.Supplier;
/** A {@link SplitReadProvider} to batch incremental diff read. */
@@ -33,13 +32,12 @@ public class IncrementalDiffReadProvider implements
SplitReadProvider {
private final LazyField<SplitRead<InternalRow>> splitRead;
public IncrementalDiffReadProvider(
- Supplier<MergeFileSplitRead> supplier,
- Consumer<SplitRead<InternalRow>> valuesAssigner) {
+ Supplier<MergeFileSplitRead> supplier, SplitReadConfig
splitReadConfig) {
this.splitRead =
new LazyField<>(
() -> {
SplitRead<InternalRow> read = create(supplier);
- valuesAssigner.accept(read);
+ splitReadConfig.config(read);
return read;
});
}
@@ -54,12 +52,7 @@ public class IncrementalDiffReadProvider implements
SplitReadProvider {
}
@Override
- public boolean initialized() {
- return splitRead.initialized();
- }
-
- @Override
- public SplitRead<InternalRow> getOrCreate() {
- return splitRead.get();
+ public LazyField<SplitRead<InternalRow>> get() {
+ return splitRead;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java
deleted file mode 100644
index 59a1f31b2a..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.splitread;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.operation.FieldMergeSplitRead;
-import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
-import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.utils.LazyField;
-
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-/** A {@link SplitReadProvider} to create {@link MergeFieldSplitReadProvider}.
*/
-public class MergeFieldSplitReadProvider implements SplitReadProvider {
-
- private final LazyField<FieldMergeSplitRead> splitRead;
-
- public MergeFieldSplitReadProvider(
- Supplier<FieldMergeSplitRead> supplier,
- Consumer<SplitRead<InternalRow>> valuesAssigner) {
- this.splitRead =
- new LazyField<>(
- () -> {
- FieldMergeSplitRead read = supplier.get();
- valuesAssigner.accept(read);
- return read;
- });
- }
-
- @Override
- public boolean match(DataSplit split, boolean forceKeepDelete) {
- List<DataFileMeta> files = split.dataFiles();
- boolean onlyAppendFiles =
- files.stream()
- .allMatch(
- f ->
- f.fileSource().isPresent()
- && f.fileSource().get() ==
FileSource.APPEND
- && f.firstRowId() != null);
- if (onlyAppendFiles) {
- // contains same first row id, need merge fields
- return
files.stream().mapToLong(DataFileMeta::firstRowId).distinct().count()
- != files.size();
- }
- return false;
- }
-
- @Override
- public boolean initialized() {
- return splitRead.initialized();
- }
-
- @Override
- public RawFileSplitRead getOrCreate() {
- return splitRead.get();
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index a748828912..5aa0a8d62e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -25,7 +25,6 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LazyField;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
@@ -36,13 +35,12 @@ public class MergeFileSplitReadProvider implements
SplitReadProvider {
private final LazyField<SplitRead<InternalRow>> splitRead;
public MergeFileSplitReadProvider(
- Supplier<MergeFileSplitRead> supplier,
- Consumer<SplitRead<InternalRow>> valuesAssigner) {
+ Supplier<MergeFileSplitRead> supplier, SplitReadConfig
splitReadConfig) {
this.splitRead =
new LazyField<>(
() -> {
SplitRead<InternalRow> read = create(supplier);
- valuesAssigner.accept(read);
+ splitReadConfig.config(read);
return read;
});
}
@@ -58,12 +56,7 @@ public class MergeFileSplitReadProvider implements
SplitReadProvider {
}
@Override
- public boolean initialized() {
- return splitRead.initialized();
- }
-
- @Override
- public SplitRead<InternalRow> getOrCreate() {
- return splitRead.get();
+ public LazyField<SplitRead<InternalRow>> get() {
+ return splitRead;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
index d52e2a902b..ab8671c966 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java
@@ -18,21 +18,18 @@
package org.apache.paimon.table.source.splitread;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.table.source.DataSplit;
-import java.util.function.Consumer;
import java.util.function.Supplier;
/** Raw file split read for all primary key table. */
public class PrimaryKeyTableRawFileSplitReadProvider extends
RawFileSplitReadProvider {
public PrimaryKeyTableRawFileSplitReadProvider(
- Supplier<RawFileSplitRead> supplier,
Consumer<SplitRead<InternalRow>> valuesAssigner) {
- super(supplier, valuesAssigner);
+ Supplier<RawFileSplitRead> supplier, SplitReadConfig
splitReadConfig) {
+ super(supplier, splitReadConfig);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
index bba0b557ec..75ab25ca91 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java
@@ -18,12 +18,9 @@
package org.apache.paimon.table.source.splitread;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.utils.LazyField;
-import java.util.function.Consumer;
import java.util.function.Supplier;
/** A {@link SplitReadProvider} to create {@link RawFileSplitRead}. */
@@ -32,23 +29,18 @@ public abstract class RawFileSplitReadProvider implements
SplitReadProvider {
private final LazyField<RawFileSplitRead> splitRead;
public RawFileSplitReadProvider(
- Supplier<RawFileSplitRead> supplier,
Consumer<SplitRead<InternalRow>> valuesAssigner) {
+ Supplier<RawFileSplitRead> supplier, SplitReadConfig
splitReadConfig) {
this.splitRead =
new LazyField<>(
() -> {
RawFileSplitRead read = supplier.get();
- valuesAssigner.accept(read);
+ splitReadConfig.config(read);
return read;
});
}
@Override
- public boolean initialized() {
- return splitRead.initialized();
- }
-
- @Override
- public SplitRead<InternalRow> getOrCreate() {
- return splitRead.get();
+ public LazyField<RawFileSplitRead> get() {
+ return splitRead;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java
similarity index 78%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java
index 2aaefb322f..83ebea0686 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java
@@ -20,14 +20,9 @@ package org.apache.paimon.table.source.splitread;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.operation.SplitRead;
-import org.apache.paimon.table.source.DataSplit;
-/** Provider to create {@link SplitRead}. */
-public interface SplitReadProvider {
+/** Config {@link SplitRead} with projection, filter and others. */
+public interface SplitReadConfig {
- boolean match(DataSplit split, boolean forceKeepDelete);
-
- boolean initialized();
-
- SplitRead<InternalRow> getOrCreate();
+ void config(SplitRead<InternalRow> read);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
index 2aaefb322f..dfc32501d6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java
@@ -21,13 +21,12 @@ package org.apache.paimon.table.source.splitread;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.LazyField;
/** Provider to create {@link SplitRead}. */
public interface SplitReadProvider {
boolean match(DataSplit split, boolean forceKeepDelete);
- boolean initialized();
-
- SplitRead<InternalRow> getOrCreate();
+ LazyField<? extends SplitRead<InternalRow>> get();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index bf4952bcd7..ad91a3c70a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -622,7 +622,7 @@ public class AppendOnlyWriterTest {
fileFormat,
targetFileSize,
AppendOnlyWriterTest.SCHEMA,
- AppendOnlyWriterTest.SCHEMA,
+ null,
getMaxSequenceNumber(toCompact),
compactManager,
null,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 55595d13d3..ca0b65b2ee 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,7 +85,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
fileFormat,
10,
SCHEMA,
- SCHEMA,
+ null,
0,
new BucketedAppendCompactManager(
null, toCompact, null, 4, 10, false, null,
null), // not used
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
similarity index 97%
rename from
paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index cbf0fa31df..8cbc0413ff 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -23,8 +23,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.operation.MergeFileSplitRead;
-import org.apache.paimon.reader.CompoundFileReader;
+import org.apache.paimon.reader.DataEvolutionFileReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -47,8 +46,8 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-/** Test for {@link MergeFileSplitRead}. */
-public class AppendMergeFieldTest extends TableTestBase {
+/** Test for table with data evolution. */
+public class DataEvolutionTableTest extends TableTestBase {
@Test
public void testBasic() throws Exception {
@@ -76,7 +75,7 @@ public class AppendMergeFieldTest extends TableTestBase {
ReadBuilder readBuilder = getTableDefault().newReadBuilder();
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
- assertThat(reader).isInstanceOf(CompoundFileReader.class);
+ assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
reader.forEachRemaining(
r -> {
assertThat(r.getInt(0)).isEqualTo(1);
@@ -203,7 +202,7 @@ public class AppendMergeFieldTest extends TableTestBase {
ReadBuilder readBuilder = getTableDefault().newReadBuilder();
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
- assertThat(reader).isInstanceOf(CompoundFileReader.class);
+ assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
reader.forEachRemaining(
r -> {
@@ -375,7 +374,7 @@ public class AppendMergeFieldTest extends TableTestBase {
ReadBuilder readBuilder = getTableDefault().newReadBuilder();
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
- assertThat(reader).isInstanceOf(CompoundFileReader.class);
+ assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
AtomicInteger i = new AtomicInteger(0);
reader.forEachRemaining(
r -> {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
new file mode 100644
index 0000000000..b53a33481a
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.splitread;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.DataEvolutionSplitRead;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link DataEvolutionSplitReadProvider}. */
+public class DataEvolutionSplitReadProviderTest {
+
+ private Supplier<DataEvolutionSplitRead> mockSupplier;
+ private SplitReadConfig mockSplitReadConfig;
+ private DataEvolutionSplitRead mockSplitRead;
+ private DataEvolutionSplitReadProvider provider;
+
+ @SuppressWarnings("unchecked")
+ @BeforeEach
+ public void setUp() {
+ mockSupplier = (Supplier<DataEvolutionSplitRead>) mock(Supplier.class);
+ mockSplitReadConfig = mock(SplitReadConfig.class);
+ mockSplitRead = mock(DataEvolutionSplitRead.class);
+ when(mockSupplier.get()).thenReturn(mockSplitRead);
+
+ provider = new DataEvolutionSplitReadProvider(mockSupplier,
mockSplitReadConfig);
+ }
+
+ @Test
+ public void testGetAndInitialization() {
+ // Supplier should not be called yet due to lazy initialization
+ verify(mockSupplier, times(0)).get();
+
+ // First access, should trigger initialization
+ DataEvolutionSplitRead read = provider.get().get();
+
+ // Verify supplier and config were called
+ verify(mockSupplier, times(1)).get();
+ verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
+ assertThat(read).isSameAs(mockSplitRead);
+
+ // Second access, should return cached instance without re-initializing
+ DataEvolutionSplitRead read2 = provider.get().get();
+ verify(mockSupplier, times(1)).get();
+ verify(mockSplitReadConfig, times(1)).config(mockSplitRead);
+ assertThat(read2).isSameAs(mockSplitRead);
+ }
+
+ @Test
+ public void testMatchWithNoFiles() {
+ DataSplit split = mock(DataSplit.class);
+ when(split.dataFiles()).thenReturn(Collections.emptyList());
+ assertThat(provider.match(split, false)).isFalse();
+ }
+
+ @Test
+ public void testMatchWithOneFile() {
+ DataSplit split = mock(DataSplit.class);
+ DataFileMeta file1 = mock(DataFileMeta.class);
+ when(split.dataFiles()).thenReturn(Collections.singletonList(file1));
+ assertThat(provider.match(split, false)).isFalse();
+ }
+
+ @Test
+ public void testMatchWithNullFirstRowId() {
+ DataSplit split = mock(DataSplit.class);
+ DataFileMeta file1 = mock(DataFileMeta.class);
+ DataFileMeta file2 = mock(DataFileMeta.class);
+
+ when(file1.firstRowId()).thenReturn(1L);
+ when(file2.firstRowId()).thenReturn(null);
+ when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+
+ assertThat(provider.match(split, false)).isFalse();
+ }
+
+ @Test
+ public void testMatchWithDifferentFirstRowIds() {
+ DataSplit split = mock(DataSplit.class);
+ DataFileMeta file1 = mock(DataFileMeta.class);
+ DataFileMeta file2 = mock(DataFileMeta.class);
+
+ when(file1.firstRowId()).thenReturn(1L);
+ when(file2.firstRowId()).thenReturn(2L);
+ when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+
+ assertThat(provider.match(split, false)).isFalse();
+ }
+
+ @Test
+ public void testMatchSuccess() {
+ DataSplit split = mock(DataSplit.class);
+ DataFileMeta file1 = mock(DataFileMeta.class);
+ DataFileMeta file2 = mock(DataFileMeta.class);
+
+ when(file1.firstRowId()).thenReturn(100L);
+ when(file2.firstRowId()).thenReturn(100L);
+ when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2));
+
+ // The forceKeepDelete parameter is not used in match, so test both
values
+ assertThat(provider.match(split, true)).isTrue();
+ assertThat(provider.match(split, false)).isTrue();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
index 8648bf734e..8d8a668f79 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java
@@ -70,7 +70,7 @@ public class AppendPreCommitCompactWorkerOperator extends
AbstractStreamOperator
this.write = (AppendFileStoreWrite) table.store().newWrite(null);
if (coreOptions.rowTrackingEnabled()) {
checkArgument(
- !coreOptions.dataElolutionEnabled(),
+ !coreOptions.dataEvolutionEnabled(),
"Data evolution enabled table should not invoke compact
yet.");
this.write.withWriteType(SpecialFields.rowTypeWithRowLineage(table.rowType()));
}