This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a7f1455bd8 [core] Refactor and add tests for data evolution union read
(#6053)
a7f1455bd8 is described below
commit a7f1455bd8801302d0b45db87a0b627bb3a2b08d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 11 15:42:30 2025 +0800
[core] Refactor and add tests for data evolution union read (#6053)
---
.../paimon/reader/DataEvolutionFileReader.java | 210 ++-------------------
.../paimon/reader/DataEvolutionIterator.java | 65 +++++++
.../org/apache/paimon/reader/DataEvolutionRow.java | 161 ++++++++++++++++
.../paimon/reader/DataEvolutionIteratorTest.java | 149 +++++++++++++++
.../apache/paimon/reader/DataEvolutionRowTest.java | 192 +++++++++++++++++++
.../org/apache/paimon/AppendOnlyFileStore.java | 3 +-
.../paimon/operation/DataEvolutionSplitRead.java | 209 ++++++++++++--------
.../apache/paimon/operation/RawFileSplitRead.java | 72 ++++---
.../org/apache/paimon/schema/SchemaValidation.java | 16 +-
9 files changed, 760 insertions(+), 317 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
index 9c75f0e114..1c50410fa1 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
@@ -18,14 +18,7 @@
package org.apache.paimon.reader;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.Variant;
-import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.IOUtils;
import javax.annotation.Nullable;
@@ -35,7 +28,7 @@ import java.io.IOException;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
- * This is a union reader which contains multiple inner reader.
+ * This is a union reader which contains multiple inner readers.
*
* <pre> This reader, assembling multiple reader into one big and great
reader. The row it produces
* also come from the readers it contains.
@@ -63,7 +56,7 @@ public class DataEvolutionFileReader implements
RecordReader<InternalRow> {
private final int[] rowOffsets;
private final int[] fieldOffsets;
- private final RecordReader<InternalRow>[] innerReaders;
+ private final RecordReader<InternalRow>[] readers;
public DataEvolutionFileReader(
int[] rowOffsets, int[] fieldOffsets, RecordReader<InternalRow>[]
readers) {
@@ -76,204 +69,35 @@ public class DataEvolutionFileReader implements
RecordReader<InternalRow> {
checkArgument(readers != null && readers.length > 1, "Readers should
be more than 1");
this.rowOffsets = rowOffsets;
this.fieldOffsets = fieldOffsets;
- this.innerReaders = readers;
+ this.readers = readers;
}
@Override
@Nullable
public RecordIterator<InternalRow> readBatch() throws IOException {
- DataEvolutionIterator iterator =
- new DataEvolutionIterator(innerReaders.length, rowOffsets,
fieldOffsets);
- for (int i = 0; i < innerReaders.length; i++) {
- RecordIterator<InternalRow> batch = innerReaders[i].readBatch();
- if (batch == null && !(innerReaders[i] instanceof
EmptyFileRecordReader)) {
- return null;
+ DataEvolutionRow row = new DataEvolutionRow(readers.length,
rowOffsets, fieldOffsets);
+ RecordIterator<InternalRow>[] iterators = new
RecordIterator[readers.length];
+ for (int i = 0; i < readers.length; i++) {
+ RecordReader<InternalRow> reader = readers[i];
+ if (reader != null) {
+ RecordIterator<InternalRow> batch = reader.readBatch();
+ if (batch == null) {
+ // all readers are aligned, as long as one returns null,
the others will also
+ // have no data
+ return null;
+ }
+ iterators[i] = batch;
}
- iterator.set(i, batch);
}
-
- return iterator;
+ return new DataEvolutionIterator(row, iterators);
}
@Override
public void close() throws IOException {
try {
- IOUtils.closeAll(innerReaders);
+ IOUtils.closeAll(readers);
} catch (Exception e) {
throw new IOException("Failed to close inner readers", e);
}
}
-
- /** The batch which is made up by several batches. */
- private static class DataEvolutionIterator implements
RecordIterator<InternalRow> {
-
- private final DataEvolutionRow dataEvolutionRow;
- private final RecordIterator<InternalRow>[] iterators;
-
- private DataEvolutionIterator(
- int rowNumber,
- int[] rowOffsets,
- int[] fieldOffsets) { // Initialize with empty arrays, will be
set later
- this.dataEvolutionRow = new DataEvolutionRow(rowNumber,
rowOffsets, fieldOffsets);
- //noinspection unchecked
- this.iterators = new RecordIterator[rowNumber];
- }
-
- public void set(int i, RecordIterator<InternalRow> iterator) {
- iterators[i] = iterator;
- }
-
- @Nullable
- @Override
- public InternalRow next() throws IOException {
- for (int i = 0; i < iterators.length; i++) {
- if (iterators[i] != null) {
- InternalRow next = iterators[i].next();
- if (next == null) {
- return null;
- }
- dataEvolutionRow.setRow(i, next);
- }
- }
- return dataEvolutionRow;
- }
-
- @Override
- public void releaseBatch() {
- for (RecordIterator<InternalRow> iterator : iterators) {
- if (iterator != null) {
- iterator.releaseBatch();
- }
- }
- }
- }
-
- /** The row which is made up by several rows. */
- private static class DataEvolutionRow implements InternalRow {
-
- private final InternalRow[] rows;
- private final int[] rowOffsets;
- private final int[] fieldOffsets;
-
- private DataEvolutionRow(int rowNumber, int[] rowOffsets, int[]
fieldOffsets) {
- this.rows = new InternalRow[rowNumber];
- this.rowOffsets = rowOffsets;
- this.fieldOffsets = fieldOffsets;
- }
-
- private void setRow(int pos, InternalRow row) {
- if (pos >= rows.length) {
- throw new IndexOutOfBoundsException(
- "Position " + pos + " is out of bounds for rows size "
+ rows.length);
- } else {
- rows[pos] = row;
- }
- }
-
- private InternalRow chooseRow(int pos) {
- return rows[(rowOffsets[pos])];
- }
-
- private int offsetInRow(int pos) {
- return fieldOffsets[pos];
- }
-
- @Override
- public int getFieldCount() {
- return fieldOffsets.length;
- }
-
- @Override
- public RowKind getRowKind() {
- return rows[0].getRowKind();
- }
-
- @Override
- public void setRowKind(RowKind kind) {
- rows[0].setRowKind(kind);
- }
-
- @Override
- public boolean isNullAt(int pos) {
- if (rowOffsets[pos] == -1) {
- return true;
- }
- return chooseRow(pos).isNullAt(offsetInRow(pos));
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return chooseRow(pos).getBoolean(offsetInRow(pos));
- }
-
- @Override
- public byte getByte(int pos) {
- return chooseRow(pos).getByte(offsetInRow(pos));
- }
-
- @Override
- public short getShort(int pos) {
- return chooseRow(pos).getShort(offsetInRow(pos));
- }
-
- @Override
- public int getInt(int pos) {
- return chooseRow(pos).getInt(offsetInRow(pos));
- }
-
- @Override
- public long getLong(int pos) {
- return chooseRow(pos).getLong(offsetInRow(pos));
- }
-
- @Override
- public float getFloat(int pos) {
- return chooseRow(pos).getFloat(offsetInRow(pos));
- }
-
- @Override
- public double getDouble(int pos) {
- return chooseRow(pos).getDouble(offsetInRow(pos));
- }
-
- @Override
- public BinaryString getString(int pos) {
- return chooseRow(pos).getString(offsetInRow(pos));
- }
-
- @Override
- public Decimal getDecimal(int pos, int precision, int scale) {
- return chooseRow(pos).getDecimal(offsetInRow(pos), precision,
scale);
- }
-
- @Override
- public Timestamp getTimestamp(int pos, int precision) {
- return chooseRow(pos).getTimestamp(offsetInRow(pos), precision);
- }
-
- @Override
- public byte[] getBinary(int pos) {
- return chooseRow(pos).getBinary(offsetInRow(pos));
- }
-
- @Override
- public Variant getVariant(int pos) {
- return chooseRow(pos).getVariant(offsetInRow(pos));
- }
-
- @Override
- public InternalArray getArray(int pos) {
- return chooseRow(pos).getArray(offsetInRow(pos));
- }
-
- @Override
- public InternalMap getMap(int pos) {
- return chooseRow(pos).getMap(offsetInRow(pos));
- }
-
- @Override
- public InternalRow getRow(int pos, int numFields) {
- return chooseRow(pos).getRow(offsetInRow(pos), numFields);
- }
- }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java
new file mode 100644
index 0000000000..17a68454f0
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * The batch which is made up by several batches, it assumes that all
iterators are aligned, and as
+ * long as one returns null, the others will also have no data.
+ */
+public class DataEvolutionIterator implements RecordIterator<InternalRow> {
+
+ private final DataEvolutionRow row;
+ private final RecordIterator<InternalRow>[] iterators;
+
+ public DataEvolutionIterator(DataEvolutionRow row,
RecordIterator<InternalRow>[] iterators) {
+ this.row = row;
+ this.iterators = iterators;
+ }
+
+ @Nullable
+ @Override
+ public InternalRow next() throws IOException {
+ for (int i = 0; i < iterators.length; i++) {
+ if (iterators[i] != null) {
+ InternalRow next = iterators[i].next();
+ if (next == null) {
+ return null;
+ }
+ row.setRow(i, next);
+ }
+ }
+ return row;
+ }
+
+ @Override
+ public void releaseBatch() {
+ for (RecordIterator<InternalRow> iterator : iterators) {
+ if (iterator != null) {
+ iterator.releaseBatch();
+ }
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
new file mode 100644
index 0000000000..db357ae6a6
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+/** The row which is made up by several rows. */
+public class DataEvolutionRow implements InternalRow {
+
+ private final InternalRow[] rows;
+ private final int[] rowOffsets;
+ private final int[] fieldOffsets;
+
+ public DataEvolutionRow(int rowNumber, int[] rowOffsets, int[]
fieldOffsets) {
+ this.rows = new InternalRow[rowNumber];
+ this.rowOffsets = rowOffsets;
+ this.fieldOffsets = fieldOffsets;
+ }
+
+ public int rowNumber() {
+ return rows.length;
+ }
+
+ public void setRow(int pos, InternalRow row) {
+ if (pos >= rows.length) {
+ throw new IndexOutOfBoundsException(
+ "Position " + pos + " is out of bounds for rows size " +
rows.length);
+ } else {
+ rows[pos] = row;
+ }
+ }
+
+ private InternalRow chooseRow(int pos) {
+ return rows[(rowOffsets[pos])];
+ }
+
+ private int offsetInRow(int pos) {
+ return fieldOffsets[pos];
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fieldOffsets.length;
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return rows[0].getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ rows[0].setRowKind(kind);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ if (rowOffsets[pos] == -1) {
+ return true;
+ }
+ return chooseRow(pos).isNullAt(offsetInRow(pos));
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return chooseRow(pos).getBoolean(offsetInRow(pos));
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return chooseRow(pos).getByte(offsetInRow(pos));
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return chooseRow(pos).getShort(offsetInRow(pos));
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return chooseRow(pos).getInt(offsetInRow(pos));
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return chooseRow(pos).getLong(offsetInRow(pos));
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return chooseRow(pos).getFloat(offsetInRow(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return chooseRow(pos).getDouble(offsetInRow(pos));
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return chooseRow(pos).getString(offsetInRow(pos));
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return chooseRow(pos).getDecimal(offsetInRow(pos), precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ return chooseRow(pos).getTimestamp(offsetInRow(pos), precision);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return chooseRow(pos).getBinary(offsetInRow(pos));
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ return chooseRow(pos).getVariant(offsetInRow(pos));
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return chooseRow(pos).getArray(offsetInRow(pos));
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ return chooseRow(pos).getMap(offsetInRow(pos));
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return chooseRow(pos).getRow(offsetInRow(pos), numFields);
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java
new file mode 100644
index 0000000000..6dd7d61de6
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.InternalRow;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link DataEvolutionIterator}. */
+public class DataEvolutionIteratorTest {
+
+ private DataEvolutionRow mockRow;
+ private RecordReader.RecordIterator<InternalRow> iterator1;
+ private RecordReader.RecordIterator<InternalRow> iterator2;
+ private InternalRow row1;
+ private InternalRow row2;
+
+ @BeforeEach
+ public void setUp() {
+ mockRow = mock(DataEvolutionRow.class);
+ iterator1 = mock(RecordReader.RecordIterator.class);
+ iterator2 = mock(RecordReader.RecordIterator.class);
+ row1 = mock(InternalRow.class);
+ row2 = mock(InternalRow.class);
+ }
+
+ @Test
+ public void testNextWithData() throws IOException {
+ when(iterator1.next()).thenReturn(row1).thenReturn(null);
+ when(iterator2.next()).thenReturn(row2).thenReturn(null);
+
+ DataEvolutionIterator evolutionIterator =
+ new DataEvolutionIterator(
+ mockRow, new RecordReader.RecordIterator[] {iterator1,
iterator2});
+
+ // First call to next()
+ InternalRow result = evolutionIterator.next();
+ assertThat(result).isSameAs(mockRow);
+
+ InOrder inOrder = inOrder(iterator1, iterator2, mockRow);
+ inOrder.verify(iterator1).next();
+ inOrder.verify(mockRow).setRow(0, row1);
+ inOrder.verify(iterator2).next();
+ inOrder.verify(mockRow).setRow(1, row2);
+
+ // Second call to next() should return null
+ InternalRow nullResult = evolutionIterator.next();
+ assertThat(nullResult).isNull();
+ verify(iterator1, times(2)).next();
+ verify(iterator2, times(1)).next(); // Should not be called again
+ }
+
+ @Test
+ public void testNextWhenFirstIteratorIsEmpty() throws IOException {
+ when(iterator1.next()).thenReturn(null);
+
+ DataEvolutionIterator evolutionIterator =
+ new DataEvolutionIterator(
+ mockRow, new RecordReader.RecordIterator[] {iterator1,
iterator2});
+
+ InternalRow result = evolutionIterator.next();
+ assertThat(result).isNull();
+
+ verify(iterator1).next();
+ verify(iterator2, never()).next();
+ verify(mockRow, never()).setRow(anyInt(), any());
+ }
+
+ @Test
+ public void testNextWithNullIteratorInArray() throws IOException {
+ when(iterator1.next()).thenReturn(row1).thenReturn(null);
+ when(iterator2.next()).thenReturn(row2).thenReturn(null);
+
+ DataEvolutionIterator evolutionIterator =
+ new DataEvolutionIterator(
+ mockRow, new RecordReader.RecordIterator[] {iterator1,
null, iterator2});
+
+ InternalRow result = evolutionIterator.next();
+ assertThat(result).isSameAs(mockRow);
+
+ InOrder inOrder = inOrder(iterator1, iterator2, mockRow);
+ inOrder.verify(iterator1).next();
+ inOrder.verify(mockRow).setRow(0, row1);
+ inOrder.verify(iterator2).next();
+ inOrder.verify(mockRow).setRow(2, row2);
+ verify(mockRow, never()).setRow(1, null); // Check that index 1 is
skipped
+
+ // Next call returns null
+ InternalRow nullResult = evolutionIterator.next();
+ assertThat(nullResult).isNull();
+ }
+
+ @Test
+ public void testNextWithEmptyIterators() throws IOException {
+ DataEvolutionIterator evolutionIterator =
+ new DataEvolutionIterator(mockRow, new
RecordReader.RecordIterator[0]);
+
+ InternalRow result = evolutionIterator.next();
+ assertThat(result).isSameAs(mockRow);
+
+ verify(mockRow, never()).setRow(anyInt(), any());
+ }
+
+ @Test
+ public void testReleaseBatch() {
+ RecordReader.RecordIterator<InternalRow> iterator3 =
+ mock(RecordReader.RecordIterator.class);
+ DataEvolutionIterator evolutionIterator =
+ new DataEvolutionIterator(
+ mockRow,
+ new RecordReader.RecordIterator[] {iterator1, null,
iterator2, iterator3});
+
+ evolutionIterator.releaseBatch();
+
+ verify(iterator1).releaseBatch();
+ verify(iterator2).releaseBatch();
+ verify(iterator3).releaseBatch();
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionRowTest.java
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionRowTest.java
new file mode 100644
index 0000000000..d1b40cd0b2
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionRowTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link DataEvolutionRow}. */
+public class DataEvolutionRowTest {
+
+ private InternalRow row1;
+ private InternalRow row2;
+ private DataEvolutionRow dataEvolutionRow;
+
+ @BeforeEach
+ public void setUp() {
+ row1 = mock(InternalRow.class);
+ row2 = mock(InternalRow.class);
+
+ // Schema: (from row1), (from row2), (null), (from row1)
+ int[] rowOffsets = new int[] {0, 1, -1, 0};
+ int[] fieldOffsets = new int[] {0, 0, -1, 1};
+
+ dataEvolutionRow = new DataEvolutionRow(2, rowOffsets, fieldOffsets);
+ dataEvolutionRow.setRow(0, row1);
+ dataEvolutionRow.setRow(1, row2);
+ }
+
+ @Test
+ public void testGetFieldCount() {
+ assertThat(dataEvolutionRow.getFieldCount()).isEqualTo(4);
+ }
+
+ @Test
+ public void testSetRowOutOfBounds() {
+ assertThatThrownBy(() -> dataEvolutionRow.setRow(2,
mock(InternalRow.class)))
+ .isInstanceOf(IndexOutOfBoundsException.class)
+ .hasMessage("Position 2 is out of bounds for rows size 2");
+ }
+
+ @Test
+ public void testRowKind() {
+ dataEvolutionRow.setRowKind(RowKind.INSERT);
+ verify(row1).setRowKind(RowKind.INSERT);
+
+ when(row1.getRowKind()).thenReturn(RowKind.DELETE);
+ assertThat(dataEvolutionRow.getRowKind()).isEqualTo(RowKind.DELETE);
+ }
+
+ @Test
+ public void testIsNullAt() {
+ // Test null from rowOffsets (field added by schema evolution)
+ assertThat(dataEvolutionRow.isNullAt(2)).isTrue();
+
+ // Test null from underlying row
+ when(row1.isNullAt(0)).thenReturn(true);
+ assertThat(dataEvolutionRow.isNullAt(0)).isTrue();
+
+ // Test not null
+ when(row2.isNullAt(0)).thenReturn(false);
+ assertThat(dataEvolutionRow.isNullAt(1)).isFalse();
+ }
+
+ @Test
+ public void testGetBoolean() {
+ when(row1.getBoolean(0)).thenReturn(true);
+ assertThat(dataEvolutionRow.getBoolean(0)).isTrue();
+ }
+
+ @Test
+ public void testGetByte() {
+ when(row1.getByte(0)).thenReturn((byte) 1);
+ assertThat(dataEvolutionRow.getByte(0)).isEqualTo((byte) 1);
+ }
+
+ @Test
+ public void testGetShort() {
+ when(row1.getShort(0)).thenReturn((short) 2);
+ assertThat(dataEvolutionRow.getShort(0)).isEqualTo((short) 2);
+ }
+
+ @Test
+ public void testGetInt() {
+ when(row1.getInt(0)).thenReturn(3);
+ assertThat(dataEvolutionRow.getInt(0)).isEqualTo(3);
+ }
+
+ @Test
+ public void testGetLong() {
+ when(row2.getLong(0)).thenReturn(4L);
+ assertThat(dataEvolutionRow.getLong(1)).isEqualTo(4L);
+ }
+
+ @Test
+ public void testGetFloat() {
+ when(row1.getFloat(1)).thenReturn(5.5f);
+ assertThat(dataEvolutionRow.getFloat(3)).isEqualTo(5.5f);
+ }
+
+ @Test
+ public void testGetDouble() {
+ when(row2.getDouble(0)).thenReturn(6.6d);
+ assertThat(dataEvolutionRow.getDouble(1)).isEqualTo(6.6d);
+ }
+
+ @Test
+ public void testGetString() {
+ BinaryString value = BinaryString.fromString("test");
+ when(row1.getString(1)).thenReturn(value);
+ assertThat(dataEvolutionRow.getString(3)).isSameAs(value);
+ }
+
+ @Test
+ public void testGetDecimal() {
+ Decimal value = Decimal.fromUnscaledLong(123, 5, 2);
+ when(row1.getDecimal(1, 5, 2)).thenReturn(value);
+ assertThat(dataEvolutionRow.getDecimal(3, 5, 2)).isSameAs(value);
+ }
+
+ @Test
+ public void testGetTimestamp() {
+ Timestamp value = Timestamp.fromEpochMillis(1000);
+ when(row2.getTimestamp(0, 3)).thenReturn(value);
+ assertThat(dataEvolutionRow.getTimestamp(1, 3)).isSameAs(value);
+ }
+
+ @Test
+ public void testGetBinary() {
+ byte[] value = new byte[] {1, 2, 3};
+ when(row1.getBinary(1)).thenReturn(value);
+ assertThat(dataEvolutionRow.getBinary(3)).isSameAs(value);
+ }
+
+ @Test
+ public void testGetVariant() {
+ Variant value = mock(Variant.class);
+ when(row1.getVariant(1)).thenReturn(value);
+ assertThat(dataEvolutionRow.getVariant(3)).isSameAs(value);
+ }
+
+ @Test
+ public void testGetArray() {
+ InternalArray value = mock(InternalArray.class);
+ when(row2.getArray(0)).thenReturn(value);
+ assertThat(dataEvolutionRow.getArray(1)).isSameAs(value);
+ }
+
+ @Test
+ public void testGetMap() {
+ InternalMap value = mock(InternalMap.class);
+ when(row1.getMap(1)).thenReturn(value);
+ assertThat(dataEvolutionRow.getMap(3)).isSameAs(value);
+ }
+
+ @Test
+ public void testGetRow() {
+ InternalRow value = mock(InternalRow.class);
+ when(row2.getRow(0, 5)).thenReturn(value);
+ assertThat(dataEvolutionRow.getRow(1, 5)).isSameAs(value);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index be3d1662c1..0d9ad3b121 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -97,8 +97,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
schema,
rowType,
FileFormatDiscover.of(options),
- pathFactory(),
- options.fileIndexReadEnabled());
+ pathFactory());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index af1e5b767f..bf6bf1ea22 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -20,15 +20,19 @@ package org.apache.paimon.operation;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataFileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.partition.PartitionUtils;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.DataEvolutionFileReader;
-import org.apache.paimon.reader.EmptyFileRecordReader;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
@@ -40,23 +44,36 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
-import org.apache.paimon.utils.IOExceptionSupplier;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.lang.String.format;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
-public class DataEvolutionSplitRead extends RawFileSplitRead {
+/**
+ * A union {@link SplitRead} to read multiple inner files to merge columns,
note that this class
+ * does not support filtering push down and deletion vectors, as they can
interfere with the process
+ * of merging columns.
+ */
+public class DataEvolutionSplitRead implements SplitRead<InternalRow> {
+
+ private final FileIO fileIO;
+ private final SchemaManager schemaManager;
+ private final TableSchema schema;
+ private final FileFormatDiscover formatDiscover;
+ private final FileStorePathFactory pathFactory;
+ private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
+
+ protected RowType readRowType;
public DataEvolutionSplitRead(
FileIO fileIO,
@@ -64,66 +81,83 @@ public class DataEvolutionSplitRead extends
RawFileSplitRead {
TableSchema schema,
RowType rowType,
FileFormatDiscover formatDiscover,
- FileStorePathFactory pathFactory,
- // TODO: Enabled file index in merge fields read
- boolean fileIndexReadEnabled) {
- super(
- fileIO,
- schemaManager,
- schema,
- rowType,
- formatDiscover,
- pathFactory,
- fileIndexReadEnabled,
- true);
+ FileStorePathFactory pathFactory) {
+ this.fileIO = fileIO;
+ this.schemaManager = schemaManager;
+ this.schema = schema;
+ this.formatDiscover = formatDiscover;
+ this.pathFactory = pathFactory;
+ this.formatReaderMappings = new HashMap<>();
+ this.readRowType = rowType;
}
@Override
- public RecordReader<InternalRow> createReader(
- BinaryRow partition,
- int bucket,
- List<DataFileMeta> files,
- @Nullable Map<String, IOExceptionSupplier<DeletionVector>>
dvFactories)
- throws IOException {
+ public SplitRead<InternalRow> forceKeepDelete() {
+ return this;
+ }
+
+ @Override
+ public SplitRead<InternalRow> withIOManager(@Nullable IOManager ioManager)
{
+ return this;
+ }
+
+ @Override
+ public SplitRead<InternalRow> withReadType(RowType readRowType) {
+ this.readRowType = readRowType;
+ return this;
+ }
+
+ @Override
+ public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
+ List<DataFileMeta> files = split.dataFiles();
+ BinaryRow partition = split.partition();
DataFilePathFactory dataFilePathFactory =
- pathFactory.createDataFilePathFactory(partition, bucket);
+ pathFactory.createDataFilePathFactory(partition,
split.bucket());
List<ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
- Builder formatReaderMappingBuilder = formatBuilder();
+ Builder formatBuilder =
+ new Builder(
+ formatDiscover,
+ readRowType.getFields(),
+ schema ->
rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
+ null);
List<List<DataFileMeta>> splitByRowId =
DataEvolutionSplitGenerator.split(files);
for (List<DataFileMeta> needMergeFiles : splitByRowId) {
if (needMergeFiles.size() == 1) {
// No need to merge fields, just create a single file reader
suppliers.add(
- createFileReader(
- partition,
- dataFilePathFactory,
- needMergeFiles.get(0),
- formatReaderMappingBuilder,
- dvFactories));
+ () ->
+ createFileReader(
+ partition,
+ dataFilePathFactory,
+ needMergeFiles.get(0),
+ formatBuilder));
} else {
suppliers.add(
() ->
- createFileReader(
+ createUnionReader(
needMergeFiles,
partition,
dataFilePathFactory,
- formatReaderMappingBuilder,
- dvFactories));
+ formatBuilder));
}
}
return ConcatRecordReader.create(suppliers);
}
- private DataEvolutionFileReader createFileReader(
+ private DataEvolutionFileReader createUnionReader(
List<DataFileMeta> needMergeFiles,
BinaryRow partition,
DataFilePathFactory dataFilePathFactory,
- Builder formatReaderMappingBuilder,
- @Nullable Map<String, IOExceptionSupplier<DeletionVector>>
dvFactories)
+ Builder formatBuilder)
throws IOException {
long rowCount = needMergeFiles.get(0).rowCount();
long firstRowId = needMergeFiles.get(0).firstRowId();
@@ -147,17 +181,6 @@ public class DataEvolutionSplitRead extends
RawFileSplitRead {
Arrays.fill(rowOffsets, -1);
Arrays.fill(fieldOffsets, -1);
- IOExceptionSupplier<DeletionVector> dvFactory = null;
- if (dvFactories != null) {
- for (DataFileMeta file : needMergeFiles) {
- IOExceptionSupplier<DeletionVector> temp =
dvFactories.get(file.fileName());
- if (temp != null && temp.get() != null) {
- dvFactory = temp;
- break;
- }
- }
- }
-
for (int i = 0; i < needMergeFiles.size(); i++) {
DataFileMeta file = needMergeFiles.get(i);
String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
@@ -185,34 +208,26 @@ public class DataEvolutionSplitRead extends
RawFileSplitRead {
}
if (readFields.isEmpty()) {
- fileRecordReaders[i] = new EmptyFileRecordReader<>();
- continue;
+ fileRecordReaders[i] = null;
+ } else {
+ // create new FormatReaderMapping for read partial fields
+ List<String> readFieldNames =
+
readFields.stream().map(DataField::name).collect(Collectors.toList());
+ FormatReaderMapping formatReaderMapping =
+ formatReaderMappings.computeIfAbsent(
+ new FormatKey(file.schemaId(),
formatIdentifier, readFieldNames),
+ key ->
+ formatBuilder.build(
+ formatIdentifier,
+ schema,
+ dataSchema,
+ readFields,
+ false));
+ fileRecordReaders[i] =
+ createFileReader(partition, file, dataFilePathFactory,
formatReaderMapping);
}
-
- Supplier<FormatReaderMapping> formatSupplier =
- () ->
- formatReaderMappingBuilder.build(
- formatIdentifier,
- schema,
- dataSchema,
- readFields,
- // TODO: enabled filter push down
- false);
-
- FormatReaderMapping formatReaderMapping =
- formatReaderMappings.computeIfAbsent(
- new FormatKey(
- file.schemaId(),
- formatIdentifier,
- readFields.stream()
- .map(DataField::name)
- .collect(Collectors.toList())),
- key -> formatSupplier.get());
-
- fileRecordReaders[i] =
- createFileReader(
- partition, file, dataFilePathFactory,
formatReaderMapping, dvFactory);
}
+
for (int i = 0; i < rowOffsets.length; i++) {
if (rowOffsets[i] == -1) {
checkArgument(
@@ -222,6 +237,50 @@ public class DataEvolutionSplitRead extends
RawFileSplitRead {
allReadFields.get(i)));
}
}
+
return new DataEvolutionFileReader(rowOffsets, fieldOffsets,
fileRecordReaders);
}
+
+ private FileRecordReader<InternalRow> createFileReader(
+ BinaryRow partition,
+ DataFilePathFactory dataFilePathFactory,
+ DataFileMeta file,
+ Builder formatBuilder)
+ throws IOException {
+ String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
+ long schemaId = file.schemaId();
+ FormatReaderMapping formatReaderMapping =
+ formatReaderMappings.computeIfAbsent(
+ new FormatKey(file.schemaId(), formatIdentifier),
+ key ->
+ formatBuilder.build(
+ formatIdentifier,
+ schema,
+ schemaId == schema.id()
+ ? schema
+ :
schemaManager.schema(schemaId)));
+ return createFileReader(partition, file, dataFilePathFactory,
formatReaderMapping);
+ }
+
+ private FileRecordReader<InternalRow> createFileReader(
+ BinaryRow partition,
+ DataFileMeta file,
+ DataFilePathFactory dataFilePathFactory,
+ FormatReaderMapping formatReaderMapping)
+ throws IOException {
+ FormatReaderContext formatReaderContext =
+ new FormatReaderContext(
+ fileIO, dataFilePathFactory.toPath(file),
file.fileSize(), null);
+ return new DataFileRecordReader(
+ schema.logicalRowType(),
+ formatReaderMapping.getReaderFactory(),
+ formatReaderContext,
+ formatReaderMapping.getIndexMapping(),
+ formatReaderMapping.getCastMapping(),
+ PartitionUtils.create(formatReaderMapping.getPartitionPair(),
partition),
+ true,
+ file.firstRowId(),
+ file.maxSequenceNumber(),
+ formatReaderMapping.getSystemFields());
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 67591e1bf4..6aac9de937 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -44,7 +44,6 @@ import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -63,9 +62,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.Supplier;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+import static org.apache.paimon.table.SpecialFields.rowTypeWithRowLineage;
/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
public class RawFileSplitRead implements SplitRead<InternalRow> {
@@ -73,16 +72,16 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
private static final Logger LOG =
LoggerFactory.getLogger(RawFileSplitRead.class);
private final FileIO fileIO;
- protected final SchemaManager schemaManager;
- protected final TableSchema schema;
- protected final FileFormatDiscover formatDiscover;
- protected final FileStorePathFactory pathFactory;
- protected final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
- protected final boolean fileIndexReadEnabled;
- protected final boolean rowTrackingEnabled;
+ private final SchemaManager schemaManager;
+ private final TableSchema schema;
+ private final FileFormatDiscover formatDiscover;
+ private final FileStorePathFactory pathFactory;
+ private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
+ private final boolean fileIndexReadEnabled;
+ private final boolean rowTrackingEnabled;
- protected RowType readRowType;
- @Nullable protected List<Predicate> filters;
+ private RowType readRowType;
+ @Nullable private List<Predicate> filters;
public RawFileSplitRead(
FileIO fileIO,
@@ -130,7 +129,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
@Override
public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
- if (split.beforeFiles().size() > 0) {
+ if (!split.beforeFiles().isEmpty()) {
LOG.info("Ignore split before files: {}", split.beforeFiles());
}
@@ -154,7 +153,18 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
pathFactory.createDataFilePathFactory(partition, bucket);
List<ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
- Builder formatReaderMappingBuilder = formatBuilder();
+ Builder formatReaderMappingBuilder =
+ new Builder(
+ formatDiscover,
+ readRowType.getFields(),
+ schema -> {
+ if (rowTrackingEnabled) {
+ return
rowTypeWithRowLineage(schema.logicalRowType(), true)
+ .getFields();
+ }
+ return schema.fields();
+ },
+ filters);
for (DataFileMeta file : files) {
suppliers.add(
@@ -169,26 +179,25 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
return ConcatRecordReader.create(suppliers);
}
- protected ReaderSupplier<InternalRow> createFileReader(
+ private ReaderSupplier<InternalRow> createFileReader(
BinaryRow partition,
DataFilePathFactory dataFilePathFactory,
DataFileMeta file,
- Builder formatReaderMappingBuilder,
+ Builder formatBuilder,
@Nullable Map<String, IOExceptionSupplier<DeletionVector>>
dvFactories) {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
long schemaId = file.schemaId();
- Supplier<FormatReaderMapping> formatSupplier =
- () ->
- formatReaderMappingBuilder.build(
- formatIdentifier,
- schema,
- schemaId == schema.id() ? schema :
schemaManager.schema(schemaId));
-
FormatReaderMapping formatReaderMapping =
formatReaderMappings.computeIfAbsent(
new FormatKey(file.schemaId(), formatIdentifier),
- key -> formatSupplier.get());
+ key ->
+ formatBuilder.build(
+ formatIdentifier,
+ schema,
+ schemaId == schema.id()
+ ? schema
+ :
schemaManager.schema(schemaId)));
IOExceptionSupplier<DeletionVector> dvFactory =
dvFactories == null ? null : dvFactories.get(file.fileName());
@@ -197,7 +206,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
partition, file, dataFilePathFactory,
formatReaderMapping, dvFactory);
}
- protected FileRecordReader<InternalRow> createFileReader(
+ private FileRecordReader<InternalRow> createFileReader(
BinaryRow partition,
DataFileMeta file,
DataFilePathFactory dataFilePathFactory,
@@ -265,19 +274,4 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
}
return fileRecordReader;
}
-
- protected Builder formatBuilder() {
- return new Builder(
- formatDiscover,
- readRowType.getFields(),
- tableSchema -> {
- if (rowTrackingEnabled) {
- return SpecialFields.rowTypeWithRowLineage(
- tableSchema.logicalRowType(), true)
- .getFields();
- }
- return tableSchema.fields();
- },
- filters);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 06d7d4117f..064630023d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -238,9 +238,7 @@ public class SchemaValidation {
validateMergeFunctionFactory(schema);
- validateRowLineage(schema, options);
-
- validateDataEvolution(options);
+ validateRowTracking(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager,
TableSchema schema) {
@@ -636,8 +634,9 @@ public class SchemaValidation {
}
}
- private static void validateRowLineage(TableSchema schema, CoreOptions
options) {
- if (options.rowTrackingEnabled()) {
+ private static void validateRowTracking(TableSchema schema, CoreOptions
options) {
+ boolean rowTrackingEnabled = options.rowTrackingEnabled();
+ if (rowTrackingEnabled) {
checkArgument(
options.bucket() == -1,
"Cannot define %s for row lineage table, it only support
bucket = -1",
@@ -647,13 +646,14 @@ public class SchemaValidation {
"Cannot define %s for row lineage table.",
PRIMARY_KEY.key());
}
- }
- private static void validateDataEvolution(CoreOptions options) {
if (options.dataEvolutionEnabled()) {
checkArgument(
- options.rowTrackingEnabled(),
+ rowTrackingEnabled,
"Data evolution config must enabled with
row-tracking.enabled");
+ checkArgument(
+ !options.deletionVectorsEnabled(),
+ "Data evolution config must disabled with
deletion-vectors.enabled");
}
}
}