This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a7f1455bd8 [core] Refactor and add tests for data evolution union read 
(#6053)
a7f1455bd8 is described below

commit a7f1455bd8801302d0b45db87a0b627bb3a2b08d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 11 15:42:30 2025 +0800

    [core] Refactor and add tests for data evolution union read (#6053)
---
 .../paimon/reader/DataEvolutionFileReader.java     | 210 ++-------------------
 .../paimon/reader/DataEvolutionIterator.java       |  65 +++++++
 .../org/apache/paimon/reader/DataEvolutionRow.java | 161 ++++++++++++++++
 .../paimon/reader/DataEvolutionIteratorTest.java   | 149 +++++++++++++++
 .../apache/paimon/reader/DataEvolutionRowTest.java | 192 +++++++++++++++++++
 .../org/apache/paimon/AppendOnlyFileStore.java     |   3 +-
 .../paimon/operation/DataEvolutionSplitRead.java   | 209 ++++++++++++--------
 .../apache/paimon/operation/RawFileSplitRead.java  |  72 ++++---
 .../org/apache/paimon/schema/SchemaValidation.java |  16 +-
 9 files changed, 760 insertions(+), 317 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
index 9c75f0e114..1c50410fa1 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
@@ -18,14 +18,7 @@
 
 package org.apache.paimon.reader;
 
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.Variant;
-import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
@@ -35,7 +28,7 @@ import java.io.IOException;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
- * This is a union reader which contains multiple inner reader.
+ * This is a union reader which contains multiple inner readers.
  *
  * <pre> This reader, assembling multiple reader into one big and great 
reader. The row it produces
  * also come from the readers it contains.
@@ -63,7 +56,7 @@ public class DataEvolutionFileReader implements 
RecordReader<InternalRow> {
 
     private final int[] rowOffsets;
     private final int[] fieldOffsets;
-    private final RecordReader<InternalRow>[] innerReaders;
+    private final RecordReader<InternalRow>[] readers;
 
     public DataEvolutionFileReader(
             int[] rowOffsets, int[] fieldOffsets, RecordReader<InternalRow>[] 
readers) {
@@ -76,204 +69,35 @@ public class DataEvolutionFileReader implements 
RecordReader<InternalRow> {
         checkArgument(readers != null && readers.length > 1, "Readers should 
be more than 1");
         this.rowOffsets = rowOffsets;
         this.fieldOffsets = fieldOffsets;
-        this.innerReaders = readers;
+        this.readers = readers;
     }
 
     @Override
     @Nullable
     public RecordIterator<InternalRow> readBatch() throws IOException {
-        DataEvolutionIterator iterator =
-                new DataEvolutionIterator(innerReaders.length, rowOffsets, 
fieldOffsets);
-        for (int i = 0; i < innerReaders.length; i++) {
-            RecordIterator<InternalRow> batch = innerReaders[i].readBatch();
-            if (batch == null && !(innerReaders[i] instanceof 
EmptyFileRecordReader)) {
-                return null;
+        DataEvolutionRow row = new DataEvolutionRow(readers.length, 
rowOffsets, fieldOffsets);
+        RecordIterator<InternalRow>[] iterators = new 
RecordIterator[readers.length];
+        for (int i = 0; i < readers.length; i++) {
+            RecordReader<InternalRow> reader = readers[i];
+            if (reader != null) {
+                RecordIterator<InternalRow> batch = reader.readBatch();
+                if (batch == null) {
+                    // all readers are aligned, as long as one returns null, 
the others will also
+                    // have no data
+                    return null;
+                }
+                iterators[i] = batch;
             }
-            iterator.set(i, batch);
         }
-
-        return iterator;
+        return new DataEvolutionIterator(row, iterators);
     }
 
     @Override
     public void close() throws IOException {
         try {
-            IOUtils.closeAll(innerReaders);
+            IOUtils.closeAll(readers);
         } catch (Exception e) {
             throw new IOException("Failed to close inner readers", e);
         }
     }
-
-    /** The batch which is made up by several batches. */
-    private static class DataEvolutionIterator implements 
RecordIterator<InternalRow> {
-
-        private final DataEvolutionRow dataEvolutionRow;
-        private final RecordIterator<InternalRow>[] iterators;
-
-        private DataEvolutionIterator(
-                int rowNumber,
-                int[] rowOffsets,
-                int[] fieldOffsets) { // Initialize with empty arrays, will be 
set later
-            this.dataEvolutionRow = new DataEvolutionRow(rowNumber, 
rowOffsets, fieldOffsets);
-            //noinspection unchecked
-            this.iterators = new RecordIterator[rowNumber];
-        }
-
-        public void set(int i, RecordIterator<InternalRow> iterator) {
-            iterators[i] = iterator;
-        }
-
-        @Nullable
-        @Override
-        public InternalRow next() throws IOException {
-            for (int i = 0; i < iterators.length; i++) {
-                if (iterators[i] != null) {
-                    InternalRow next = iterators[i].next();
-                    if (next == null) {
-                        return null;
-                    }
-                    dataEvolutionRow.setRow(i, next);
-                }
-            }
-            return dataEvolutionRow;
-        }
-
-        @Override
-        public void releaseBatch() {
-            for (RecordIterator<InternalRow> iterator : iterators) {
-                if (iterator != null) {
-                    iterator.releaseBatch();
-                }
-            }
-        }
-    }
-
-    /** The row which is made up by several rows. */
-    private static class DataEvolutionRow implements InternalRow {
-
-        private final InternalRow[] rows;
-        private final int[] rowOffsets;
-        private final int[] fieldOffsets;
-
-        private DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] 
fieldOffsets) {
-            this.rows = new InternalRow[rowNumber];
-            this.rowOffsets = rowOffsets;
-            this.fieldOffsets = fieldOffsets;
-        }
-
-        private void setRow(int pos, InternalRow row) {
-            if (pos >= rows.length) {
-                throw new IndexOutOfBoundsException(
-                        "Position " + pos + " is out of bounds for rows size " 
+ rows.length);
-            } else {
-                rows[pos] = row;
-            }
-        }
-
-        private InternalRow chooseRow(int pos) {
-            return rows[(rowOffsets[pos])];
-        }
-
-        private int offsetInRow(int pos) {
-            return fieldOffsets[pos];
-        }
-
-        @Override
-        public int getFieldCount() {
-            return fieldOffsets.length;
-        }
-
-        @Override
-        public RowKind getRowKind() {
-            return rows[0].getRowKind();
-        }
-
-        @Override
-        public void setRowKind(RowKind kind) {
-            rows[0].setRowKind(kind);
-        }
-
-        @Override
-        public boolean isNullAt(int pos) {
-            if (rowOffsets[pos] == -1) {
-                return true;
-            }
-            return chooseRow(pos).isNullAt(offsetInRow(pos));
-        }
-
-        @Override
-        public boolean getBoolean(int pos) {
-            return chooseRow(pos).getBoolean(offsetInRow(pos));
-        }
-
-        @Override
-        public byte getByte(int pos) {
-            return chooseRow(pos).getByte(offsetInRow(pos));
-        }
-
-        @Override
-        public short getShort(int pos) {
-            return chooseRow(pos).getShort(offsetInRow(pos));
-        }
-
-        @Override
-        public int getInt(int pos) {
-            return chooseRow(pos).getInt(offsetInRow(pos));
-        }
-
-        @Override
-        public long getLong(int pos) {
-            return chooseRow(pos).getLong(offsetInRow(pos));
-        }
-
-        @Override
-        public float getFloat(int pos) {
-            return chooseRow(pos).getFloat(offsetInRow(pos));
-        }
-
-        @Override
-        public double getDouble(int pos) {
-            return chooseRow(pos).getDouble(offsetInRow(pos));
-        }
-
-        @Override
-        public BinaryString getString(int pos) {
-            return chooseRow(pos).getString(offsetInRow(pos));
-        }
-
-        @Override
-        public Decimal getDecimal(int pos, int precision, int scale) {
-            return chooseRow(pos).getDecimal(offsetInRow(pos), precision, 
scale);
-        }
-
-        @Override
-        public Timestamp getTimestamp(int pos, int precision) {
-            return chooseRow(pos).getTimestamp(offsetInRow(pos), precision);
-        }
-
-        @Override
-        public byte[] getBinary(int pos) {
-            return chooseRow(pos).getBinary(offsetInRow(pos));
-        }
-
-        @Override
-        public Variant getVariant(int pos) {
-            return chooseRow(pos).getVariant(offsetInRow(pos));
-        }
-
-        @Override
-        public InternalArray getArray(int pos) {
-            return chooseRow(pos).getArray(offsetInRow(pos));
-        }
-
-        @Override
-        public InternalMap getMap(int pos) {
-            return chooseRow(pos).getMap(offsetInRow(pos));
-        }
-
-        @Override
-        public InternalRow getRow(int pos, int numFields) {
-            return chooseRow(pos).getRow(offsetInRow(pos), numFields);
-        }
-    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java
new file mode 100644
index 0000000000..17a68454f0
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * The batch which is made up by several batches, it assumes that all 
iterators are aligned, and as
+ * long as one returns null, the others will also have no data.
+ */
+public class DataEvolutionIterator implements RecordIterator<InternalRow> {
+
+    private final DataEvolutionRow row;
+    private final RecordIterator<InternalRow>[] iterators;
+
+    public DataEvolutionIterator(DataEvolutionRow row, 
RecordIterator<InternalRow>[] iterators) {
+        this.row = row;
+        this.iterators = iterators;
+    }
+
+    @Nullable
+    @Override
+    public InternalRow next() throws IOException {
+        for (int i = 0; i < iterators.length; i++) {
+            if (iterators[i] != null) {
+                InternalRow next = iterators[i].next();
+                if (next == null) {
+                    return null;
+                }
+                row.setRow(i, next);
+            }
+        }
+        return row;
+    }
+
+    @Override
+    public void releaseBatch() {
+        for (RecordIterator<InternalRow> iterator : iterators) {
+            if (iterator != null) {
+                iterator.releaseBatch();
+            }
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
new file mode 100644
index 0000000000..db357ae6a6
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+/** The row which is made up by several rows. */
+public class DataEvolutionRow implements InternalRow {
+
+    private final InternalRow[] rows;
+    private final int[] rowOffsets;
+    private final int[] fieldOffsets;
+
+    public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] 
fieldOffsets) {
+        this.rows = new InternalRow[rowNumber];
+        this.rowOffsets = rowOffsets;
+        this.fieldOffsets = fieldOffsets;
+    }
+
+    public int rowNumber() {
+        return rows.length;
+    }
+
+    public void setRow(int pos, InternalRow row) {
+        if (pos >= rows.length) {
+            throw new IndexOutOfBoundsException(
+                    "Position " + pos + " is out of bounds for rows size " + 
rows.length);
+        } else {
+            rows[pos] = row;
+        }
+    }
+
+    private InternalRow chooseRow(int pos) {
+        return rows[(rowOffsets[pos])];
+    }
+
+    private int offsetInRow(int pos) {
+        return fieldOffsets[pos];
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fieldOffsets.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return rows[0].getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        rows[0].setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        if (rowOffsets[pos] == -1) {
+            return true;
+        }
+        return chooseRow(pos).isNullAt(offsetInRow(pos));
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return chooseRow(pos).getBoolean(offsetInRow(pos));
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return chooseRow(pos).getByte(offsetInRow(pos));
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return chooseRow(pos).getShort(offsetInRow(pos));
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return chooseRow(pos).getInt(offsetInRow(pos));
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return chooseRow(pos).getLong(offsetInRow(pos));
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return chooseRow(pos).getFloat(offsetInRow(pos));
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return chooseRow(pos).getDouble(offsetInRow(pos));
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return chooseRow(pos).getString(offsetInRow(pos));
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        return chooseRow(pos).getDecimal(offsetInRow(pos), precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        return chooseRow(pos).getTimestamp(offsetInRow(pos), precision);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return chooseRow(pos).getBinary(offsetInRow(pos));
+    }
+
+    @Override
+    public Variant getVariant(int pos) {
+        return chooseRow(pos).getVariant(offsetInRow(pos));
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return chooseRow(pos).getArray(offsetInRow(pos));
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        return chooseRow(pos).getMap(offsetInRow(pos));
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return chooseRow(pos).getRow(offsetInRow(pos), numFields);
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java
new file mode 100644
index 0000000000..6dd7d61de6
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.InternalRow;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link DataEvolutionIterator}. */
+public class DataEvolutionIteratorTest {
+
+    private DataEvolutionRow mockRow;
+    private RecordReader.RecordIterator<InternalRow> iterator1;
+    private RecordReader.RecordIterator<InternalRow> iterator2;
+    private InternalRow row1;
+    private InternalRow row2;
+
+    @BeforeEach
+    public void setUp() {
+        mockRow = mock(DataEvolutionRow.class);
+        iterator1 = mock(RecordReader.RecordIterator.class);
+        iterator2 = mock(RecordReader.RecordIterator.class);
+        row1 = mock(InternalRow.class);
+        row2 = mock(InternalRow.class);
+    }
+
+    @Test
+    public void testNextWithData() throws IOException {
+        when(iterator1.next()).thenReturn(row1).thenReturn(null);
+        when(iterator2.next()).thenReturn(row2).thenReturn(null);
+
+        DataEvolutionIterator evolutionIterator =
+                new DataEvolutionIterator(
+                        mockRow, new RecordReader.RecordIterator[] {iterator1, 
iterator2});
+
+        // First call to next()
+        InternalRow result = evolutionIterator.next();
+        assertThat(result).isSameAs(mockRow);
+
+        InOrder inOrder = inOrder(iterator1, iterator2, mockRow);
+        inOrder.verify(iterator1).next();
+        inOrder.verify(mockRow).setRow(0, row1);
+        inOrder.verify(iterator2).next();
+        inOrder.verify(mockRow).setRow(1, row2);
+
+        // Second call to next() should return null
+        InternalRow nullResult = evolutionIterator.next();
+        assertThat(nullResult).isNull();
+        verify(iterator1, times(2)).next();
+        verify(iterator2, times(1)).next(); // Should not be called again
+    }
+
+    @Test
+    public void testNextWhenFirstIteratorIsEmpty() throws IOException {
+        when(iterator1.next()).thenReturn(null);
+
+        DataEvolutionIterator evolutionIterator =
+                new DataEvolutionIterator(
+                        mockRow, new RecordReader.RecordIterator[] {iterator1, 
iterator2});
+
+        InternalRow result = evolutionIterator.next();
+        assertThat(result).isNull();
+
+        verify(iterator1).next();
+        verify(iterator2, never()).next();
+        verify(mockRow, never()).setRow(anyInt(), any());
+    }
+
+    @Test
+    public void testNextWithNullIteratorInArray() throws IOException {
+        when(iterator1.next()).thenReturn(row1).thenReturn(null);
+        when(iterator2.next()).thenReturn(row2).thenReturn(null);
+
+        DataEvolutionIterator evolutionIterator =
+                new DataEvolutionIterator(
+                        mockRow, new RecordReader.RecordIterator[] {iterator1, 
null, iterator2});
+
+        InternalRow result = evolutionIterator.next();
+        assertThat(result).isSameAs(mockRow);
+
+        InOrder inOrder = inOrder(iterator1, iterator2, mockRow);
+        inOrder.verify(iterator1).next();
+        inOrder.verify(mockRow).setRow(0, row1);
+        inOrder.verify(iterator2).next();
+        inOrder.verify(mockRow).setRow(2, row2);
+        verify(mockRow, never()).setRow(1, null); // Check that index 1 is 
skipped
+
+        // Next call returns null
+        InternalRow nullResult = evolutionIterator.next();
+        assertThat(nullResult).isNull();
+    }
+
+    @Test
+    public void testNextWithEmptyIterators() throws IOException {
+        DataEvolutionIterator evolutionIterator =
+                new DataEvolutionIterator(mockRow, new 
RecordReader.RecordIterator[0]);
+
+        InternalRow result = evolutionIterator.next();
+        assertThat(result).isSameAs(mockRow);
+
+        verify(mockRow, never()).setRow(anyInt(), any());
+    }
+
+    @Test
+    public void testReleaseBatch() {
+        RecordReader.RecordIterator<InternalRow> iterator3 =
+                mock(RecordReader.RecordIterator.class);
+        DataEvolutionIterator evolutionIterator =
+                new DataEvolutionIterator(
+                        mockRow,
+                        new RecordReader.RecordIterator[] {iterator1, null, 
iterator2, iterator3});
+
+        evolutionIterator.releaseBatch();
+
+        verify(iterator1).releaseBatch();
+        verify(iterator2).releaseBatch();
+        verify(iterator3).releaseBatch();
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionRowTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionRowTest.java
new file mode 100644
index 0000000000..d1b40cd0b2
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionRowTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link DataEvolutionRow}. */
+public class DataEvolutionRowTest {
+
+    private InternalRow row1;
+    private InternalRow row2;
+    private DataEvolutionRow dataEvolutionRow;
+
+    @BeforeEach
+    public void setUp() {
+        row1 = mock(InternalRow.class);
+        row2 = mock(InternalRow.class);
+
+        // Schema: (from row1), (from row2), (null), (from row1)
+        int[] rowOffsets = new int[] {0, 1, -1, 0};
+        int[] fieldOffsets = new int[] {0, 0, -1, 1};
+
+        dataEvolutionRow = new DataEvolutionRow(2, rowOffsets, fieldOffsets);
+        dataEvolutionRow.setRow(0, row1);
+        dataEvolutionRow.setRow(1, row2);
+    }
+
+    @Test
+    public void testGetFieldCount() {
+        assertThat(dataEvolutionRow.getFieldCount()).isEqualTo(4);
+    }
+
+    @Test
+    public void testSetRowOutOfBounds() {
+        assertThatThrownBy(() -> dataEvolutionRow.setRow(2, 
mock(InternalRow.class)))
+                .isInstanceOf(IndexOutOfBoundsException.class)
+                .hasMessage("Position 2 is out of bounds for rows size 2");
+    }
+
+    @Test
+    public void testRowKind() {
+        dataEvolutionRow.setRowKind(RowKind.INSERT);
+        verify(row1).setRowKind(RowKind.INSERT);
+
+        when(row1.getRowKind()).thenReturn(RowKind.DELETE);
+        assertThat(dataEvolutionRow.getRowKind()).isEqualTo(RowKind.DELETE);
+    }
+
+    @Test
+    public void testIsNullAt() {
+        // Test null from rowOffsets (field added by schema evolution)
+        assertThat(dataEvolutionRow.isNullAt(2)).isTrue();
+
+        // Test null from underlying row
+        when(row1.isNullAt(0)).thenReturn(true);
+        assertThat(dataEvolutionRow.isNullAt(0)).isTrue();
+
+        // Test not null
+        when(row2.isNullAt(0)).thenReturn(false);
+        assertThat(dataEvolutionRow.isNullAt(1)).isFalse();
+    }
+
+    @Test
+    public void testGetBoolean() {
+        when(row1.getBoolean(0)).thenReturn(true);
+        assertThat(dataEvolutionRow.getBoolean(0)).isTrue();
+    }
+
+    @Test
+    public void testGetByte() {
+        when(row1.getByte(0)).thenReturn((byte) 1);
+        assertThat(dataEvolutionRow.getByte(0)).isEqualTo((byte) 1);
+    }
+
+    @Test
+    public void testGetShort() {
+        when(row1.getShort(0)).thenReturn((short) 2);
+        assertThat(dataEvolutionRow.getShort(0)).isEqualTo((short) 2);
+    }
+
+    @Test
+    public void testGetInt() {
+        when(row1.getInt(0)).thenReturn(3);
+        assertThat(dataEvolutionRow.getInt(0)).isEqualTo(3);
+    }
+
+    @Test
+    public void testGetLong() {
+        when(row2.getLong(0)).thenReturn(4L);
+        assertThat(dataEvolutionRow.getLong(1)).isEqualTo(4L);
+    }
+
+    @Test
+    public void testGetFloat() {
+        when(row1.getFloat(1)).thenReturn(5.5f);
+        assertThat(dataEvolutionRow.getFloat(3)).isEqualTo(5.5f);
+    }
+
+    @Test
+    public void testGetDouble() {
+        when(row2.getDouble(0)).thenReturn(6.6d);
+        assertThat(dataEvolutionRow.getDouble(1)).isEqualTo(6.6d);
+    }
+
+    @Test
+    public void testGetString() {
+        BinaryString value = BinaryString.fromString("test");
+        when(row1.getString(1)).thenReturn(value);
+        assertThat(dataEvolutionRow.getString(3)).isSameAs(value);
+    }
+
+    @Test
+    public void testGetDecimal() {
+        Decimal value = Decimal.fromUnscaledLong(123, 5, 2);
+        when(row1.getDecimal(1, 5, 2)).thenReturn(value);
+        assertThat(dataEvolutionRow.getDecimal(3, 5, 2)).isSameAs(value);
+    }
+
+    @Test
+    public void testGetTimestamp() {
+        Timestamp value = Timestamp.fromEpochMillis(1000);
+        when(row2.getTimestamp(0, 3)).thenReturn(value);
+        assertThat(dataEvolutionRow.getTimestamp(1, 3)).isSameAs(value);
+    }
+
+    @Test
+    public void testGetBinary() {
+        byte[] value = new byte[] {1, 2, 3};
+        when(row1.getBinary(1)).thenReturn(value);
+        assertThat(dataEvolutionRow.getBinary(3)).isSameAs(value);
+    }
+
+    @Test
+    public void testGetVariant() {
+        Variant value = mock(Variant.class);
+        when(row1.getVariant(1)).thenReturn(value);
+        assertThat(dataEvolutionRow.getVariant(3)).isSameAs(value);
+    }
+
+    @Test
+    public void testGetArray() {
+        InternalArray value = mock(InternalArray.class);
+        when(row2.getArray(0)).thenReturn(value);
+        assertThat(dataEvolutionRow.getArray(1)).isSameAs(value);
+    }
+
+    @Test
+    public void testGetMap() {
+        InternalMap value = mock(InternalMap.class);
+        when(row1.getMap(1)).thenReturn(value);
+        assertThat(dataEvolutionRow.getMap(3)).isSameAs(value);
+    }
+
+    @Test
+    public void testGetRow() {
+        InternalRow value = mock(InternalRow.class);
+        when(row2.getRow(0, 5)).thenReturn(value);
+        assertThat(dataEvolutionRow.getRow(1, 5)).isSameAs(value);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index be3d1662c1..0d9ad3b121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -97,8 +97,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 schema,
                 rowType,
                 FileFormatDiscover.of(options),
-                pathFactory(),
-                options.fileIndexReadEnabled());
+                pathFactory());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index af1e5b767f..bf6bf1ea22 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -20,15 +20,19 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatKey;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataFileRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.partition.PartitionUtils;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.DataEvolutionFileReader;
-import org.apache.paimon.reader.EmptyFileRecordReader;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
@@ -40,23 +44,36 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FormatReaderMapping;
 import org.apache.paimon.utils.FormatReaderMapping.Builder;
-import org.apache.paimon.utils.IOExceptionSupplier;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
-public class DataEvolutionSplitRead extends RawFileSplitRead {
+/**
+ * A union {@link SplitRead} to read multiple inner files to merge columns, 
note that this class
+ * does not support filtering push down and deletion vectors, as they can 
interfere with the process
+ * of merging columns.
+ */
+public class DataEvolutionSplitRead implements SplitRead<InternalRow> {
+
+    private final FileIO fileIO;
+    private final SchemaManager schemaManager;
+    private final TableSchema schema;
+    private final FileFormatDiscover formatDiscover;
+    private final FileStorePathFactory pathFactory;
+    private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
+
+    protected RowType readRowType;
 
     public DataEvolutionSplitRead(
             FileIO fileIO,
@@ -64,66 +81,83 @@ public class DataEvolutionSplitRead extends 
RawFileSplitRead {
             TableSchema schema,
             RowType rowType,
             FileFormatDiscover formatDiscover,
-            FileStorePathFactory pathFactory,
-            // TODO: Enabled file index in merge fields read
-            boolean fileIndexReadEnabled) {
-        super(
-                fileIO,
-                schemaManager,
-                schema,
-                rowType,
-                formatDiscover,
-                pathFactory,
-                fileIndexReadEnabled,
-                true);
+            FileStorePathFactory pathFactory) {
+        this.fileIO = fileIO;
+        this.schemaManager = schemaManager;
+        this.schema = schema;
+        this.formatDiscover = formatDiscover;
+        this.pathFactory = pathFactory;
+        this.formatReaderMappings = new HashMap<>();
+        this.readRowType = rowType;
     }
 
     @Override
-    public RecordReader<InternalRow> createReader(
-            BinaryRow partition,
-            int bucket,
-            List<DataFileMeta> files,
-            @Nullable Map<String, IOExceptionSupplier<DeletionVector>> 
dvFactories)
-            throws IOException {
+    public SplitRead<InternalRow> forceKeepDelete() {
+        return this;
+    }
+
+    @Override
+    public SplitRead<InternalRow> withIOManager(@Nullable IOManager ioManager) 
{
+        return this;
+    }
+
+    @Override
+    public SplitRead<InternalRow> withReadType(RowType readRowType) {
+        this.readRowType = readRowType;
+        return this;
+    }
+
+    @Override
+    public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
+        return this;
+    }
+
+    @Override
+    public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
+        List<DataFileMeta> files = split.dataFiles();
+        BinaryRow partition = split.partition();
         DataFilePathFactory dataFilePathFactory =
-                pathFactory.createDataFilePathFactory(partition, bucket);
+                pathFactory.createDataFilePathFactory(partition, 
split.bucket());
         List<ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
 
-        Builder formatReaderMappingBuilder = formatBuilder();
+        Builder formatBuilder =
+                new Builder(
+                        formatDiscover,
+                        readRowType.getFields(),
+                        schema -> 
rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
+                        null);
 
         List<List<DataFileMeta>> splitByRowId = 
DataEvolutionSplitGenerator.split(files);
         for (List<DataFileMeta> needMergeFiles : splitByRowId) {
             if (needMergeFiles.size() == 1) {
                 // No need to merge fields, just create a single file reader
                 suppliers.add(
-                        createFileReader(
-                                partition,
-                                dataFilePathFactory,
-                                needMergeFiles.get(0),
-                                formatReaderMappingBuilder,
-                                dvFactories));
+                        () ->
+                                createFileReader(
+                                        partition,
+                                        dataFilePathFactory,
+                                        needMergeFiles.get(0),
+                                        formatBuilder));
 
             } else {
                 suppliers.add(
                         () ->
-                                createFileReader(
+                                createUnionReader(
                                         needMergeFiles,
                                         partition,
                                         dataFilePathFactory,
-                                        formatReaderMappingBuilder,
-                                        dvFactories));
+                                        formatBuilder));
             }
         }
 
         return ConcatRecordReader.create(suppliers);
     }
 
-    private DataEvolutionFileReader createFileReader(
+    private DataEvolutionFileReader createUnionReader(
             List<DataFileMeta> needMergeFiles,
             BinaryRow partition,
             DataFilePathFactory dataFilePathFactory,
-            Builder formatReaderMappingBuilder,
-            @Nullable Map<String, IOExceptionSupplier<DeletionVector>> 
dvFactories)
+            Builder formatBuilder)
             throws IOException {
         long rowCount = needMergeFiles.get(0).rowCount();
         long firstRowId = needMergeFiles.get(0).firstRowId();
@@ -147,17 +181,6 @@ public class DataEvolutionSplitRead extends 
RawFileSplitRead {
         Arrays.fill(rowOffsets, -1);
         Arrays.fill(fieldOffsets, -1);
 
-        IOExceptionSupplier<DeletionVector> dvFactory = null;
-        if (dvFactories != null) {
-            for (DataFileMeta file : needMergeFiles) {
-                IOExceptionSupplier<DeletionVector> temp = 
dvFactories.get(file.fileName());
-                if (temp != null && temp.get() != null) {
-                    dvFactory = temp;
-                    break;
-                }
-            }
-        }
-
         for (int i = 0; i < needMergeFiles.size(); i++) {
             DataFileMeta file = needMergeFiles.get(i);
             String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
@@ -185,34 +208,26 @@ public class DataEvolutionSplitRead extends 
RawFileSplitRead {
             }
 
             if (readFields.isEmpty()) {
-                fileRecordReaders[i] = new EmptyFileRecordReader<>();
-                continue;
+                fileRecordReaders[i] = null;
+            } else {
+                // create new FormatReaderMapping for read partial fields
+                List<String> readFieldNames =
+                        
readFields.stream().map(DataField::name).collect(Collectors.toList());
+                FormatReaderMapping formatReaderMapping =
+                        formatReaderMappings.computeIfAbsent(
+                                new FormatKey(file.schemaId(), 
formatIdentifier, readFieldNames),
+                                key ->
+                                        formatBuilder.build(
+                                                formatIdentifier,
+                                                schema,
+                                                dataSchema,
+                                                readFields,
+                                                false));
+                fileRecordReaders[i] =
+                        createFileReader(partition, file, dataFilePathFactory, 
formatReaderMapping);
             }
-
-            Supplier<FormatReaderMapping> formatSupplier =
-                    () ->
-                            formatReaderMappingBuilder.build(
-                                    formatIdentifier,
-                                    schema,
-                                    dataSchema,
-                                    readFields,
-                                    // TODO: enabled filter push down
-                                    false);
-
-            FormatReaderMapping formatReaderMapping =
-                    formatReaderMappings.computeIfAbsent(
-                            new FormatKey(
-                                    file.schemaId(),
-                                    formatIdentifier,
-                                    readFields.stream()
-                                            .map(DataField::name)
-                                            .collect(Collectors.toList())),
-                            key -> formatSupplier.get());
-
-            fileRecordReaders[i] =
-                    createFileReader(
-                            partition, file, dataFilePathFactory, 
formatReaderMapping, dvFactory);
         }
+
         for (int i = 0; i < rowOffsets.length; i++) {
             if (rowOffsets[i] == -1) {
                 checkArgument(
@@ -222,6 +237,50 @@ public class DataEvolutionSplitRead extends 
RawFileSplitRead {
                                 allReadFields.get(i)));
             }
         }
+
         return new DataEvolutionFileReader(rowOffsets, fieldOffsets, 
fileRecordReaders);
     }
+
+    private FileRecordReader<InternalRow> createFileReader(
+            BinaryRow partition,
+            DataFilePathFactory dataFilePathFactory,
+            DataFileMeta file,
+            Builder formatBuilder)
+            throws IOException {
+        String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
+        long schemaId = file.schemaId();
+        FormatReaderMapping formatReaderMapping =
+                formatReaderMappings.computeIfAbsent(
+                        new FormatKey(file.schemaId(), formatIdentifier),
+                        key ->
+                                formatBuilder.build(
+                                        formatIdentifier,
+                                        schema,
+                                        schemaId == schema.id()
+                                                ? schema
+                                                : 
schemaManager.schema(schemaId)));
+        return createFileReader(partition, file, dataFilePathFactory, 
formatReaderMapping);
+    }
+
+    private FileRecordReader<InternalRow> createFileReader(
+            BinaryRow partition,
+            DataFileMeta file,
+            DataFilePathFactory dataFilePathFactory,
+            FormatReaderMapping formatReaderMapping)
+            throws IOException {
+        FormatReaderContext formatReaderContext =
+                new FormatReaderContext(
+                        fileIO, dataFilePathFactory.toPath(file), 
file.fileSize(), null);
+        return new DataFileRecordReader(
+                schema.logicalRowType(),
+                formatReaderMapping.getReaderFactory(),
+                formatReaderContext,
+                formatReaderMapping.getIndexMapping(),
+                formatReaderMapping.getCastMapping(),
+                PartitionUtils.create(formatReaderMapping.getPartitionPair(), 
partition),
+                true,
+                file.firstRowId(),
+                file.maxSequenceNumber(),
+                formatReaderMapping.getSystemFields());
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 67591e1bf4..6aac9de937 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -44,7 +44,6 @@ import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -63,9 +62,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Supplier;
 
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
 
 /** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
 public class RawFileSplitRead implements SplitRead<InternalRow> {
@@ -73,16 +72,16 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     private static final Logger LOG = 
LoggerFactory.getLogger(RawFileSplitRead.class);
 
     private final FileIO fileIO;
-    protected final SchemaManager schemaManager;
-    protected final TableSchema schema;
-    protected final FileFormatDiscover formatDiscover;
-    protected final FileStorePathFactory pathFactory;
-    protected final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
-    protected final boolean fileIndexReadEnabled;
-    protected final boolean rowTrackingEnabled;
+    private final SchemaManager schemaManager;
+    private final TableSchema schema;
+    private final FileFormatDiscover formatDiscover;
+    private final FileStorePathFactory pathFactory;
+    private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
+    private final boolean fileIndexReadEnabled;
+    private final boolean rowTrackingEnabled;
 
-    protected RowType readRowType;
-    @Nullable protected List<Predicate> filters;
+    private RowType readRowType;
+    @Nullable private List<Predicate> filters;
 
     public RawFileSplitRead(
             FileIO fileIO,
@@ -130,7 +129,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
     @Override
     public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
-        if (split.beforeFiles().size() > 0) {
+        if (!split.beforeFiles().isEmpty()) {
             LOG.info("Ignore split before files: {}", split.beforeFiles());
         }
 
@@ -154,7 +153,18 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                 pathFactory.createDataFilePathFactory(partition, bucket);
         List<ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
 
-        Builder formatReaderMappingBuilder = formatBuilder();
+        Builder formatReaderMappingBuilder =
+                new Builder(
+                        formatDiscover,
+                        readRowType.getFields(),
+                        schema -> {
+                            if (rowTrackingEnabled) {
+                                return 
rowTypeWithRowLineage(schema.logicalRowType(), true)
+                                        .getFields();
+                            }
+                            return schema.fields();
+                        },
+                        filters);
 
         for (DataFileMeta file : files) {
             suppliers.add(
@@ -169,26 +179,25 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         return ConcatRecordReader.create(suppliers);
     }
 
-    protected ReaderSupplier<InternalRow> createFileReader(
+    private ReaderSupplier<InternalRow> createFileReader(
             BinaryRow partition,
             DataFilePathFactory dataFilePathFactory,
             DataFileMeta file,
-            Builder formatReaderMappingBuilder,
+            Builder formatBuilder,
             @Nullable Map<String, IOExceptionSupplier<DeletionVector>> 
dvFactories) {
         String formatIdentifier = 
DataFilePathFactory.formatIdentifier(file.fileName());
         long schemaId = file.schemaId();
 
-        Supplier<FormatReaderMapping> formatSupplier =
-                () ->
-                        formatReaderMappingBuilder.build(
-                                formatIdentifier,
-                                schema,
-                                schemaId == schema.id() ? schema : 
schemaManager.schema(schemaId));
-
         FormatReaderMapping formatReaderMapping =
                 formatReaderMappings.computeIfAbsent(
                         new FormatKey(file.schemaId(), formatIdentifier),
-                        key -> formatSupplier.get());
+                        key ->
+                                formatBuilder.build(
+                                        formatIdentifier,
+                                        schema,
+                                        schemaId == schema.id()
+                                                ? schema
+                                                : 
schemaManager.schema(schemaId)));
 
         IOExceptionSupplier<DeletionVector> dvFactory =
                 dvFactories == null ? null : dvFactories.get(file.fileName());
@@ -197,7 +206,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                         partition, file, dataFilePathFactory, 
formatReaderMapping, dvFactory);
     }
 
-    protected FileRecordReader<InternalRow> createFileReader(
+    private FileRecordReader<InternalRow> createFileReader(
             BinaryRow partition,
             DataFileMeta file,
             DataFilePathFactory dataFilePathFactory,
@@ -265,19 +274,4 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         }
         return fileRecordReader;
     }
-
-    protected Builder formatBuilder() {
-        return new Builder(
-                formatDiscover,
-                readRowType.getFields(),
-                tableSchema -> {
-                    if (rowTrackingEnabled) {
-                        return SpecialFields.rowTypeWithRowLineage(
-                                        tableSchema.logicalRowType(), true)
-                                .getFields();
-                    }
-                    return tableSchema.fields();
-                },
-                filters);
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 06d7d4117f..064630023d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -238,9 +238,7 @@ public class SchemaValidation {
 
         validateMergeFunctionFactory(schema);
 
-        validateRowLineage(schema, options);
-
-        validateDataEvolution(options);
+        validateRowTracking(schema, options);
     }
 
     public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
@@ -636,8 +634,9 @@ public class SchemaValidation {
         }
     }
 
-    private static void validateRowLineage(TableSchema schema, CoreOptions 
options) {
-        if (options.rowTrackingEnabled()) {
+    private static void validateRowTracking(TableSchema schema, CoreOptions 
options) {
+        boolean rowTrackingEnabled = options.rowTrackingEnabled();
+        if (rowTrackingEnabled) {
             checkArgument(
                     options.bucket() == -1,
                     "Cannot define %s for row lineage table, it only support 
bucket = -1",
@@ -647,13 +646,14 @@ public class SchemaValidation {
                     "Cannot define %s for row lineage table.",
                     PRIMARY_KEY.key());
         }
-    }
 
-    private static void validateDataEvolution(CoreOptions options) {
         if (options.dataEvolutionEnabled()) {
             checkArgument(
-                    options.rowTrackingEnabled(),
+                    rowTrackingEnabled,
                     "Data evolution config must enabled with 
row-tracking.enabled");
+            checkArgument(
+                    !options.deletionVectorsEnabled(),
+                    "Data evolution config must disabled with 
deletion-vectors.enabled");
         }
     }
 }

Reply via email to