This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 75b44862bbee640a39088ddf2d0b1cbc399ff38d
Author: YeJunHao <[email protected]>
AuthorDate: Mon Nov 3 20:00:19 2025 +0800

    [core] Support non null column with write type (#6513)
---
 .../apache/paimon/table/sink/TableWriteImpl.java   | 15 +++++---
 .../paimon/table/DataEvolutionTableTest.java       | 41 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 325911b0ea..493a2b4ffe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -54,7 +54,6 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
  */
 public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State<T>>> {
 
-    private final RowType rowType;
     private final FileStoreWrite<T> write;
     private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
     private final RecordExtractor<T> recordExtractor;
@@ -63,8 +62,9 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
 
     private boolean batchCommitted = false;
     private BucketMode bucketMode;
+    private RowType writeType;
+    private int[] notNullFieldIndex;
 
-    private final int[] notNullFieldIndex;
     private final @Nullable DefaultValueRow defaultValueRow;
 
     public TableWriteImpl(
@@ -74,7 +74,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
             RecordExtractor<T> recordExtractor,
             @Nullable RowKindGenerator rowKindGenerator,
             @Nullable RowKindFilter rowKindFilter) {
-        this.rowType = rowType;
+        this.writeType = rowType;
         this.write = write;
         this.keyAndBucketExtractor = keyAndBucketExtractor;
         this.recordExtractor = recordExtractor;
@@ -115,6 +115,13 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     @Override
     public TableWriteImpl<T> withWriteType(RowType writeType) {
         write.withWriteType(writeType);
+        this.writeType = writeType;
+        List<String> notNullColumnNames =
+                writeType.getFields().stream()
+                        .filter(field -> !field.type().isNullable())
+                        .map(DataField::name)
+                        .collect(Collectors.toList());
+        this.notNullFieldIndex = writeType.getFieldIndices(notNullColumnNames);
         return this;
     }
 
@@ -189,7 +196,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     private void checkNullability(InternalRow row) {
         for (int idx : notNullFieldIndex) {
             if (row.isNullAt(idx)) {
-                String columnName = rowType.getFields().get(idx).name();
+                String columnName = writeType.getFields().get(idx).name();
                 throw new RuntimeException(
                         String.format("Cannot write null to non-null 
column(%s)", columnName));
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index dc4db3da5b..0cd2cf4608 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -419,6 +419,47 @@ public class DataEvolutionTableTest extends TableTestBase {
                 .isEqualTo(2);
     }
 
+    @Test
+    public void testNonNullColumn() throws Exception {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.STRING().copy(false));
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
+        Schema schema = schemaBuilder.build();
+
+        catalog.createTable(identifier(), schema, true);
+        Table table = catalog.getTable(identifier());
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        BatchTableWrite write = builder.newWrite();
+        write.write(GenericRow.of(1, BinaryString.fromString("a"), 
BinaryString.fromString("b")));
+        BatchTableCommit commit = builder.newCommit();
+        List<CommitMessage> commitables = write.prepareCommit();
+        commit.commit(commitables);
+
+        write =
+                builder.newWrite()
+                        
.withWriteType(schema.rowType().project(Collections.singletonList("f2")));
+        write.write(GenericRow.of(BinaryString.fromString("c")));
+        commit = builder.newCommit();
+        commitables = write.prepareCommit();
+        setFirstRowId(commitables, 0L);
+        commit.commit(commitables);
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
+        reader.forEachRemaining(
+                r -> {
+                    assertThat(r.getInt(0)).isEqualTo(1);
+                    assertThat(r.getString(1).toString()).isEqualTo("a");
+                    assertThat(r.getString(2).toString()).isEqualTo("c");
+                });
+    }
+
     protected Schema schemaDefault() {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());

Reply via email to