This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 75b44862bbee640a39088ddf2d0b1cbc399ff38d Author: YeJunHao <[email protected]> AuthorDate: Mon Nov 3 20:00:19 2025 +0800 [core] Support non null column with write type (#6513) --- .../apache/paimon/table/sink/TableWriteImpl.java | 15 +++++--- .../paimon/table/DataEvolutionTableTest.java | 41 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 325911b0ea..493a2b4ffe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -54,7 +54,6 @@ import static org.apache.paimon.utils.Preconditions.checkState; */ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State<T>>> { - private final RowType rowType; private final FileStoreWrite<T> write; private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor; private final RecordExtractor<T> recordExtractor; @@ -63,8 +62,9 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State private boolean batchCommitted = false; private BucketMode bucketMode; + private RowType writeType; + private int[] notNullFieldIndex; - private final int[] notNullFieldIndex; private final @Nullable DefaultValueRow defaultValueRow; public TableWriteImpl( @@ -74,7 +74,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State RecordExtractor<T> recordExtractor, @Nullable RowKindGenerator rowKindGenerator, @Nullable RowKindFilter rowKindFilter) { - this.rowType = rowType; + this.writeType = rowType; this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; @@ -115,6 +115,13 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State @Override public TableWriteImpl<T> withWriteType(RowType writeType) { write.withWriteType(writeType); + this.writeType = writeType; + List<String> notNullColumnNames = + writeType.getFields().stream() + .filter(field -> !field.type().isNullable()) + .map(DataField::name) + .collect(Collectors.toList()); + this.notNullFieldIndex = writeType.getFieldIndices(notNullColumnNames); return this; } @@ -189,7 +196,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State private void checkNullability(InternalRow row) { for (int idx : notNullFieldIndex) { if (row.isNullAt(idx)) { - String columnName = rowType.getFields().get(idx).name(); + String columnName = writeType.getFields().get(idx).name(); throw new RuntimeException( String.format("Cannot write null to non-null column(%s)", columnName)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java index dc4db3da5b..0cd2cf4608 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java @@ -419,6 +419,47 @@ public class DataEvolutionTableTest extends TableTestBase { .isEqualTo(2); } + @Test + public void testNonNullColumn() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.STRING().copy(false)); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + + Schema schema = schemaBuilder.build(); + + catalog.createTable(identifier(), schema, true); + Table table = catalog.getTable(identifier()); + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + BatchTableWrite write = builder.newWrite(); + write.write(GenericRow.of(1, BinaryString.fromString("a"), BinaryString.fromString("b"))); + BatchTableCommit commit = builder.newCommit(); + List<CommitMessage> commitables = write.prepareCommit(); + commit.commit(commitables); + + write = + builder.newWrite() + .withWriteType(schema.rowType().project(Collections.singletonList("f2"))); + write.write(GenericRow.of(BinaryString.fromString("c"))); + commit = builder.newCommit(); + commitables = write.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + + ReadBuilder readBuilder = table.newReadBuilder(); + RecordReader<InternalRow> reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); + reader.forEachRemaining( + r -> { + assertThat(r.getInt(0)).isEqualTo(1); + assertThat(r.getString(1).toString()).isEqualTo("a"); + assertThat(r.getString(2).toString()).isEqualTo("c"); + }); + } + protected Schema schemaDefault() { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT());
