This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4bcd867e0f [core] Support non null column with write type (#6513)
4bcd867e0f is described below
commit 4bcd867e0f1e266f0f55fd3612d5cbf8ee11abe6
Author: YeJunHao <[email protected]>
AuthorDate: Mon Nov 3 20:00:19 2025 +0800
[core] Support non null column with write type (#6513)
---
.../apache/paimon/table/sink/TableWriteImpl.java | 15 +++++---
.../paimon/table/DataEvolutionTableTest.java | 41 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 8d5757dcd3..82b0b87439 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -53,7 +53,6 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
*/
public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State<T>>> {
- private final RowType rowType;
private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
@@ -62,8 +61,9 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
private boolean batchCommitted = false;
private BucketMode bucketMode;
+ private RowType writeType;
+ private int[] notNullFieldIndex;
- private final int[] notNullFieldIndex;
private final @Nullable DefaultValueRow defaultValueRow;
public TableWriteImpl(
@@ -73,7 +73,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
RecordExtractor<T> recordExtractor,
@Nullable RowKindGenerator rowKindGenerator,
@Nullable RowKindFilter rowKindFilter) {
- this.rowType = rowType;
+ this.writeType = rowType;
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
@@ -114,6 +114,13 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
@Override
public TableWriteImpl<T> withWriteType(RowType writeType) {
write.withWriteType(writeType);
+ this.writeType = writeType;
+ List<String> notNullColumnNames =
+ writeType.getFields().stream()
+ .filter(field -> !field.type().isNullable())
+ .map(DataField::name)
+ .collect(Collectors.toList());
+ this.notNullFieldIndex = writeType.getFieldIndices(notNullColumnNames);
return this;
}
@@ -188,7 +195,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
private void checkNullability(InternalRow row) {
for (int idx : notNullFieldIndex) {
if (row.isNullAt(idx)) {
- String columnName = rowType.getFields().get(idx).name();
+ String columnName = writeType.getFields().get(idx).name();
throw new RuntimeException(
String.format("Cannot write null to non-null
column(%s)", columnName));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 65cd919e63..9b2655f340 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -600,6 +600,47 @@ public class DataEvolutionTableTest extends TableTestBase {
assertThat(i.get()).isEqualTo(2);
}
+ @Test
+ public void testNonNullColumn() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.STRING().copy(false));
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
+ Schema schema = schemaBuilder.build();
+
+ catalog.createTable(identifier(), schema, true);
+ Table table = catalog.getTable(identifier());
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ BatchTableWrite write = builder.newWrite();
+ write.write(GenericRow.of(1, BinaryString.fromString("a"),
BinaryString.fromString("b")));
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write.prepareCommit();
+ commit.commit(commitables);
+
+ write =
+ builder.newWrite()
+
.withWriteType(schema.rowType().project(Collections.singletonList("f2")));
+ write.write(GenericRow.of(BinaryString.fromString("c")));
+ commit = builder.newCommit();
+ commitables = write.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
+ reader.forEachRemaining(
+ r -> {
+ assertThat(r.getInt(0)).isEqualTo(1);
+ assertThat(r.getString(1).toString()).isEqualTo("a");
+ assertThat(r.getString(2).toString()).isEqualTo("c");
+ });
+ }
+
protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());