This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 85b3cdfb18 [core] Add tests for schema evolution in
RemoteLookupFileManager
85b3cdfb18 is described below
commit 85b3cdfb186204b7718eed8c5df563dde7f05382
Author: JingsongLi <[email protected]>
AuthorDate: Fri Nov 14 12:45:36 2025 +0800
[core] Add tests for schema evolution in RemoteLookupFileManager
---
.../flink/lookup/DeletionVectorsTableTest.java | 46 ++++++++++++++++++++--
1 file changed, 42 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
index 60d1546d18..0d07826f99 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/DeletionVectorsTableTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -36,6 +37,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
@@ -49,6 +51,7 @@ import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link LookupTable} with deletion vectors. */
public class DeletionVectorsTableTest extends TableTestBase {
@@ -63,6 +66,21 @@ public class DeletionVectorsTableTest extends TableTestBase {
@Test
public void testRemoteFile() throws Exception {
+ innerTestRemoteFile(false, false);
+ }
+
+ @Test
+ public void testRemoteFileSchemEvolution() throws Exception {
+ innerTestRemoteFile(true, false);
+ }
+
+ @Test
+ public void testRemoteFileSchemEvolutionAndNotCompatible() throws
Exception {
+ innerTestRemoteFile(true, true);
+ }
+
+ public void innerTestRemoteFile(boolean schemaEvolution, boolean
notCompatible)
+ throws Exception {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
@@ -120,13 +138,33 @@ public class DeletionVectorsTableTest extends
TableTestBase {
fileIO.delete(firstPath, false);
// should no exception when lookup in write
- table =
table.copy(Collections.singletonMap(CoreOptions.COMPACTION_SIZE_RATIO.key(),
"0"));
+ if (schemaEvolution) {
+ catalog.alterTable(
+ identifier,
+
SchemaChange.setOption(CoreOptions.COMPACTION_SIZE_RATIO.key(), "0"),
+ false);
+ if (notCompatible) {
+ catalog.alterTable(
+ identifier, SchemaChange.addColumn("new_col",
DataTypes.INT()), false);
+ }
+ table = (FileStoreTable) catalog.getTable(identifier);
+ } else {
+ table =
+ table.copy(
+
Collections.singletonMap(CoreOptions.COMPACTION_SIZE_RATIO.key(), "0"));
+ }
writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write =
writeBuilder.newWrite().withIOManager(ioManager);
BatchTableCommit commit = writeBuilder.newCommit()) {
- write.write(GenericRow.of(1, 2));
- write.write(GenericRow.of(2, 2));
- commit.commit(write.prepareCommit());
+ write.write(GenericRow.of(1, 2, 2));
+ write.write(GenericRow.of(2, 2, 2));
+ if (notCompatible) {
+ assertThatThrownBy(() -> commit.commit(write.prepareCommit()))
+ .hasMessageContaining("FileNotFoundException");
+ return;
+ } else {
+ commit.commit(write.prepareCommit());
+ }
}
// restore file and check reading