This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d28093782 [format] Format writer check null value for non-null field 
and throw useful message (#6213)
2d28093782 is described below

commit 2d28093782e444e3f0d26c4b788ec292cdf6f947
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 9 13:31:22 2025 +0800

    [format] Format writer check null value for non-null field and throw useful 
message (#6213)
---
 .../paimon/arrow/writer/ArrowFieldWriter.java      |  12 +-
 .../apache/paimon/format/FormatReadWriteTest.java  |  23 +++
 .../paimon/flink/ContinuousFileStoreITCase.java    |   8 +-
 .../paimon/format/avro/AvroRowDatumWriter.java     |  11 +-
 .../paimon/format/avro/FieldWriterFactory.java     |  27 ++-
 .../apache/paimon/format/orc/OrcFileFormat.java    |  10 +-
 .../format/orc/writer/RowDataVectorizer.java       |  36 +++-
 .../parquet/writer/ParquetRowDataWriter.java       | 202 +++++++--------------
 .../paimon/format/orc/OrcWriterFactoryTest.java    |   7 +-
 9 files changed, 164 insertions(+), 172 deletions(-)

diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
index 77c66cc2b7..e785d48bf0 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.arrow.writer;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.DataGetters;
 import org.apache.paimon.data.columnar.ColumnVector;
 
@@ -25,8 +26,6 @@ import org.apache.arrow.vector.FieldVector;
 
 import javax.annotation.Nullable;
 
-import static java.lang.String.format;
-
 /** A reusable writer to convert a field into Arrow {@link FieldVector}. */
 public abstract class ArrowFieldWriter {
 
@@ -73,9 +72,12 @@ public abstract class ArrowFieldWriter {
         if (getters.isNullAt(pos)) {
             if (!isNullable) {
                 throw new IllegalArgumentException(
-                        format(
-                                "Arrow does not support null values in 
non-nullable fields. Field name : %s expected not null but found null",
-                                fieldVector.getName()));
+                        String.format(
+                                "Field '%s' expected not null but found null 
value. A possible cause is that the table used "
+                                        + "%s or %s merge-engine and the 
aggregate function produced null value when retracting.",
+                                fieldVector.getName(),
+                                CoreOptions.MergeEngine.PARTIAL_UPDATE,
+                                CoreOptions.MergeEngine.AGGREGATE));
             }
             fieldVector.setNull(rowIndex);
         } else {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index 14698a47f8..a54796f06b 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -57,7 +57,10 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.IntStream;
 
 import static org.apache.paimon.data.BinaryString.fromString;
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /** Test Base class for Format. */
 public abstract class FormatReadWriteTest {
@@ -249,6 +252,26 @@ public abstract class FormatReadWriteTest {
         
assertThat(array.getVariant(1).toJson()).isEqualTo("{\"age\":45,\"city\":\"Beijing\"}");
     }
 
+    @Test
+    public void testWriteNullToNonNullField() {
+        FileFormat format = fileFormat();
+        String identifier = format.getFormatIdentifier();
+        // no agg for these formats now
+        assumeTrue(!identifier.equals("csv") && !identifier.equals("json"));
+
+        FormatWriterFactory factory =
+                format.createWriterFactory(
+                        RowType.builder().field("f0", 
DataTypes.INT().notNull()).build());
+
+        assertThatThrownBy(() -> write(factory, file, GenericRow.of((Object) 
null)))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Field 'f0' expected not null but found null 
value. A possible cause is "
+                                        + "that the table used partial-update 
or aggregation merge-engine and the aggregate "
+                                        + "function produced null value when 
retracting."));
+    }
+
     protected void write(FormatWriterFactory factory, Path file, 
InternalRow... rows)
             throws IOException {
         FormatWriter writer;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 689afe09fa..317ca24175 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.SnapshotTest.newSnapshotManager;
@@ -641,7 +642,7 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testAvroRetractNotNullField() {
+    public void testAvroRetractNotNullField() throws ExecutionException, 
InterruptedException {
         List<Row> input =
                 Arrays.asList(
                         Row.ofKind(RowKind.INSERT, 1, "A"), 
Row.ofKind(RowKind.DELETE, 1, "A"));
@@ -661,7 +662,8 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                         () -> sEnv.executeSql("INSERT INTO avro_sink select * 
from source").await())
                 .satisfies(
                         anyCauseMatches(
-                                RuntimeException.class,
-                                "Caught NullPointerException, the possible 
reason is you have set following options together"));
+                                IllegalArgumentException.class,
+                                "Field 'a' expected not null but found null 
value. A possible cause is that "
+                                        + "the table used partial-update or 
aggregation merge-engine and the aggregate function produced null value when 
retracting."));
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
index d302451625..c2bd81d003 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
@@ -56,15 +56,6 @@ public class AvroRowDatumWriter implements 
DatumWriter<InternalRow> {
             // top Row is a UNION type
             out.writeIndex(1);
         }
-        try {
-            this.writer.writeRow(datum, out);
-        } catch (NullPointerException npe) {
-            throw new RuntimeException(
-                    "Caught NullPointerException, the possible reason is you 
have set following options together:\n"
-                            + "  1. file.format = avro;\n"
-                            + "  2. merge-function = 
aggregation/partial-update;\n"
-                            + "  3. some fields are not null.",
-                    npe);
-        }
+        this.writer.writeRow(datum, out);
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
index a8b659a749..e108959ffe 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format.avro;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.DataGetters;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericRow;
@@ -240,14 +241,20 @@ public class FieldWriterFactory implements 
AvroSchemaVisitor<FieldWriter> {
     public class RowWriter implements FieldWriter {
 
         private final FieldWriter[] fieldWriters;
+        private final String[] fieldNames;
+        private final boolean[] isNullable;
 
         private RowWriter(Schema schema, List<DataField> fields) {
             List<Schema.Field> schemaFields = schema.getFields();
             this.fieldWriters = new FieldWriter[schemaFields.size()];
+            this.fieldNames = new String[schemaFields.size()];
+            this.isNullable = new boolean[schemaFields.size()];
             for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize; 
i++) {
                 Schema.Field field = schemaFields.get(i);
                 DataType type = fields.get(i).type();
                 fieldWriters[i] = visit(field.schema(), type);
+                fieldNames[i] = field.name();
+                isNullable[i] = type.isNullable();
             }
         }
 
@@ -258,8 +265,24 @@ public class FieldWriterFactory implements 
AvroSchemaVisitor<FieldWriter> {
         }
 
         public void writeRow(InternalRow row, Encoder encoder) throws 
IOException {
-            for (int i = 0; i < fieldWriters.length; i += 1) {
-                fieldWriters[i].write(row, i, encoder);
+            int i = 0;
+            try {
+                for (; i < fieldWriters.length; i += 1) {
+                    fieldWriters[i].write(row, i, encoder);
+                }
+            } catch (NullPointerException npe) {
+                if (!isNullable[i] && row.isNullAt(i)) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Field '%s' expected not null but found 
null value. A possible cause is that the "
+                                            + "table used %s or %s 
merge-engine and the aggregate function produced "
+                                            + "null value when retracting.",
+                                    fieldNames[i],
+                                    CoreOptions.MergeEngine.PARTIAL_UPDATE,
+                                    CoreOptions.MergeEngine.AGGREGATE));
+                } else {
+                    throw npe;
+                }
             }
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 99e7a9efdd..76286b8cce 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -57,7 +57,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static 
org.apache.paimon.format.OrcOptions.ORC_TIMESTAMP_LTZ_LEGACY_TYPE;
-import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
 
 /** Orc {@link FileFormat}. */
 @ThreadSafe
@@ -144,12 +143,11 @@ public class OrcFileFormat extends FileFormat {
      */
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
-        DataType refinedType = refineDataType(type);
-        DataType[] orcTypes = getFieldTypes(refinedType).toArray(new 
DataType[0]);
-
-        TypeDescription typeDescription = 
OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
+        RowType refinedType = (RowType) refineDataType(type);
+        TypeDescription typeDescription = 
OrcTypeUtil.convertToOrcSchema(refinedType);
         Vectorizer<InternalRow> vectorizer =
-                new RowDataVectorizer(typeDescription, orcTypes, 
legacyTimestampLtzType);
+                new RowDataVectorizer(
+                        typeDescription, refinedType.getFields(), 
legacyTimestampLtzType);
 
         return new OrcWriterFactory(vectorizer, orcProperties, writerConf, 
writeBatchSize);
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index 51c7acf170..5cb37f191a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -18,30 +18,36 @@
 
 package org.apache.paimon.format.orc.writer;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
 
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** A {@link Vectorizer} of {@link InternalRow} type element. */
 public class RowDataVectorizer extends Vectorizer<InternalRow> {
 
-    private final List<FieldWriter> fieldWriters;
+    private final FieldWriter[] fieldWriters;
+    private final String[] fieldNames;
+    private final boolean[] isNullable;
 
     public RowDataVectorizer(
-            TypeDescription schema, DataType[] fieldTypes, boolean 
legacyTimestampLtzType) {
+            TypeDescription schema, List<DataField> dataFields, boolean 
legacyTimestampLtzType) {
         super(schema);
         FieldWriterFactory fieldWriterFactory = new 
FieldWriterFactory(legacyTimestampLtzType);
-        this.fieldWriters =
-                Arrays.stream(fieldTypes)
-                        .map(t -> t.accept(fieldWriterFactory))
-                        .collect(Collectors.toList());
+        this.fieldWriters = new FieldWriter[dataFields.size()];
+        this.fieldNames = new String[dataFields.size()];
+        this.isNullable = new boolean[dataFields.size()];
+        for (int i = 0; i < dataFields.size(); i++) {
+            DataField field = dataFields.get(i);
+            fieldWriters[i] = field.type().accept(fieldWriterFactory);
+            fieldNames[i] = field.name();
+            isNullable[i] = field.type().isNullable();
+        }
     }
 
     @Override
@@ -50,10 +56,20 @@ public class RowDataVectorizer extends 
Vectorizer<InternalRow> {
         for (int i = 0; i < row.getFieldCount(); ++i) {
             ColumnVector fieldColumn = batch.cols[i];
             if (row.isNullAt(i)) {
+                if (!isNullable[i]) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Field '%s' expected not null but found 
null value. A possible cause is that the "
+                                            + "table used %s or %s 
merge-engine and the aggregate function produced "
+                                            + "null value when retracting.",
+                                    fieldNames[i],
+                                    CoreOptions.MergeEngine.PARTIAL_UPDATE,
+                                    CoreOptions.MergeEngine.AGGREGATE));
+                }
                 fieldColumn.noNulls = false;
                 fieldColumn.isNull[rowId] = true;
             } else {
-                fieldWriters.get(i).write(rowId, fieldColumn, row, i);
+                fieldWriters[i].write(rowId, fieldColumn, row, i);
             }
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
index fe62dc2174..c00734b024 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format.parquet.writer;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
@@ -55,7 +56,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static java.lang.String.format;
 import static 
org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -75,7 +75,7 @@ public class ParquetRowDataWriter {
             RecordConsumer recordConsumer, RowType rowType, GroupType schema, 
Configuration conf) {
         this.conf = conf;
         this.recordConsumer = recordConsumer;
-        this.rowWriter = new RowWriter(rowType, schema, false);
+        this.rowWriter = new RowWriter(rowType, schema);
     }
 
     /**
@@ -94,37 +94,35 @@ public class ParquetRowDataWriter {
             switch (t.getTypeRoot()) {
                 case CHAR:
                 case VARCHAR:
-                    return new StringWriter(t.isNullable());
+                    return new StringWriter();
                 case BOOLEAN:
-                    return new BooleanWriter(t.isNullable());
+                    return new BooleanWriter();
                 case BINARY:
                 case VARBINARY:
-                    return new BinaryWriter(t.isNullable());
+                    return new BinaryWriter();
                 case DECIMAL:
                     DecimalType decimalType = (DecimalType) t;
-                    return createDecimalWriter(
-                            decimalType.getPrecision(), 
decimalType.getScale(), t.isNullable());
+                    return createDecimalWriter(decimalType.getPrecision(), 
decimalType.getScale());
                 case TINYINT:
-                    return new ByteWriter(t.isNullable());
+                    return new ByteWriter();
                 case SMALLINT:
-                    return new ShortWriter(t.isNullable());
+                    return new ShortWriter();
                 case DATE:
                 case TIME_WITHOUT_TIME_ZONE:
                 case INTEGER:
-                    return new IntWriter(t.isNullable());
+                    return new IntWriter();
                 case BIGINT:
-                    return new LongWriter(t.isNullable());
+                    return new LongWriter();
                 case FLOAT:
-                    return new FloatWriter(t.isNullable());
+                    return new FloatWriter();
                 case DOUBLE:
-                    return new DoubleWriter(t.isNullable());
+                    return new DoubleWriter();
                 case TIMESTAMP_WITHOUT_TIME_ZONE:
                     TimestampType timestampType = (TimestampType) t;
-                    return createTimestampWriter(timestampType.getPrecision(), 
t.isNullable());
+                    return createTimestampWriter(timestampType.getPrecision());
                 case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                     LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) t;
-                    return createTimestampWriter(
-                            localZonedTimestampType.getPrecision(), 
t.isNullable());
+                    return 
createTimestampWriter(localZonedTimestampType.getPrecision());
                 default:
                     throw new UnsupportedOperationException("Unsupported type: 
" + type);
             }
@@ -134,26 +132,19 @@ public class ParquetRowDataWriter {
 
             if (t instanceof ArrayType
                     && annotation instanceof 
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
-                return new ArrayWriter(((ArrayType) t).getElementType(), 
groupType, t.isNullable());
+                return new ArrayWriter(((ArrayType) t).getElementType(), 
groupType);
             } else if (t instanceof MapType
                     && annotation instanceof 
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
                 return new MapWriter(
-                        ((MapType) t).getKeyType(),
-                        ((MapType) t).getValueType(),
-                        groupType,
-                        t.isNullable());
+                        ((MapType) t).getKeyType(), ((MapType) 
t).getValueType(), groupType);
             } else if (t instanceof MultisetType
                     && annotation instanceof 
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
                 return new MapWriter(
-                        ((MultisetType) t).getElementType(),
-                        new IntType(false),
-                        groupType,
-                        t.isNullable());
+                        ((MultisetType) t).getElementType(), new 
IntType(false), groupType);
             } else if (t instanceof RowType && type instanceof GroupType) {
-                return new RowWriter((RowType) t, groupType, t.isNullable());
+                return new RowWriter((RowType) t, groupType);
             } else if (t instanceof VariantType && type instanceof GroupType) {
                 return new VariantWriter(
-                        t.isNullable(),
                         groupType,
                         VariantUtils.extractShreddingSchemaFromConf(conf, 
type.getName()));
             } else {
@@ -162,38 +153,24 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private FieldWriter createTimestampWriter(int precision, boolean 
isNullable) {
+    private FieldWriter createTimestampWriter(int precision) {
         if (precision <= 3) {
-            return new TimestampMillsWriter(precision, isNullable);
+            return new TimestampMillsWriter(precision);
         } else if (precision > 6) {
-            return new TimestampInt96Writer(precision, isNullable);
+            return new TimestampInt96Writer(precision);
         } else {
-            return new TimestampMicrosWriter(precision, isNullable);
+            return new TimestampMicrosWriter(precision);
         }
     }
 
-    private abstract static class FieldWriter {
-
-        private final boolean isNullable;
-
-        public FieldWriter(boolean isNullable) {
-            this.isNullable = isNullable;
-        }
-
-        abstract void write(InternalRow row, int ordinal);
+    private interface FieldWriter {
 
-        abstract void write(InternalArray arrayData, int ordinal);
+        void write(InternalRow row, int ordinal);
 
-        public boolean isNullable() {
-            return isNullable;
-        }
+        void write(InternalArray arrayData, int ordinal);
     }
 
-    private class BooleanWriter extends FieldWriter {
-
-        public BooleanWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class BooleanWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -210,11 +187,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class ByteWriter extends FieldWriter {
-
-        public ByteWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class ByteWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -231,11 +204,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class ShortWriter extends FieldWriter {
-
-        public ShortWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class ShortWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -252,11 +221,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class LongWriter extends FieldWriter {
-
-        public LongWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class LongWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -273,11 +238,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class FloatWriter extends FieldWriter {
-
-        public FloatWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class FloatWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -294,11 +255,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class DoubleWriter extends FieldWriter {
-
-        public DoubleWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class DoubleWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -315,11 +272,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class StringWriter extends FieldWriter {
-
-        public StringWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class StringWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -336,11 +289,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class BinaryWriter extends FieldWriter {
-
-        public BinaryWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class BinaryWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -357,11 +306,7 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class IntWriter extends FieldWriter {
-
-        public IntWriter(boolean isNullable) {
-            super(isNullable);
-        }
+    private class IntWriter implements FieldWriter {
 
         @Override
         public void write(InternalRow row, int ordinal) {
@@ -378,12 +323,11 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class TimestampMillsWriter extends FieldWriter {
+    private class TimestampMillsWriter implements FieldWriter {
 
         private final int precision;
 
-        private TimestampMillsWriter(int precision, boolean isNullable) {
-            super(isNullable);
+        private TimestampMillsWriter(int precision) {
             checkArgument(precision <= 3);
             this.precision = precision;
         }
@@ -403,12 +347,11 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class TimestampMicrosWriter extends FieldWriter {
+    private class TimestampMicrosWriter implements FieldWriter {
 
         private final int precision;
 
-        private TimestampMicrosWriter(int precision, boolean isNullable) {
-            super(isNullable);
+        private TimestampMicrosWriter(int precision) {
             checkArgument(precision > 3);
             checkArgument(precision <= 6);
             this.precision = precision;
@@ -429,12 +372,11 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class TimestampInt96Writer extends FieldWriter {
+    private class TimestampInt96Writer implements FieldWriter {
 
         private final int precision;
 
-        private TimestampInt96Writer(int precision, boolean isNullable) {
-            super(isNullable);
+        private TimestampInt96Writer(int precision) {
             checkArgument(precision > 6);
             this.precision = precision;
         }
@@ -455,7 +397,7 @@ public class ParquetRowDataWriter {
     }
 
     /** It writes a map field to parquet, both key and value are nullable. */
-    private class MapWriter extends FieldWriter {
+    private class MapWriter implements FieldWriter {
 
         private final String repeatedGroupName;
         private final String keyName;
@@ -463,9 +405,7 @@ public class ParquetRowDataWriter {
         private final FieldWriter keyWriter;
         private final FieldWriter valueWriter;
 
-        private MapWriter(
-                DataType keyType, DataType valueType, GroupType groupType, 
boolean isNullable) {
-            super(isNullable);
+        private MapWriter(DataType keyType, DataType valueType, GroupType 
groupType) {
             // Get the internal map structure (MAP_KEY_VALUE)
             GroupType repeatedType = groupType.getType(0).asGroupType();
             this.repeatedGroupName = repeatedType.getName();
@@ -529,14 +469,13 @@ public class ParquetRowDataWriter {
     }
 
     /** It writes an array type field to parquet. */
-    private class ArrayWriter extends FieldWriter {
+    private class ArrayWriter implements FieldWriter {
 
         private final String elementName;
         private final FieldWriter elementWriter;
         private final String repeatedGroupName;
 
-        private ArrayWriter(DataType t, GroupType groupType, boolean 
isNullable) {
-            super(isNullable);
+        private ArrayWriter(DataType t, GroupType groupType) {
             // Get the internal array structure
             GroupType repeatedType = groupType.getType(0).asGroupType();
             this.repeatedGroupName = repeatedType.getName();
@@ -580,17 +519,19 @@ public class ParquetRowDataWriter {
     }
 
     /** It writes a row type field to parquet. */
-    private class RowWriter extends FieldWriter {
+    private class RowWriter implements FieldWriter {
         private final FieldWriter[] fieldWriters;
         private final String[] fieldNames;
+        private final boolean[] isNullable;
 
-        public RowWriter(RowType rowType, GroupType groupType, boolean 
isNullable) {
-            super(isNullable);
+        public RowWriter(RowType rowType, GroupType groupType) {
             this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
             List<DataType> fieldTypes = rowType.getFieldTypes();
             this.fieldWriters = new FieldWriter[rowType.getFieldCount()];
+            this.isNullable = new boolean[rowType.getFieldCount()];
             for (int i = 0; i < fieldWriters.length; i++) {
                 fieldWriters[i] = createWriter(fieldTypes.get(i), 
groupType.getType(i));
+                isNullable[i] = fieldTypes.get(i).isNullable();
             }
         }
 
@@ -604,11 +545,15 @@ public class ParquetRowDataWriter {
                     writer.write(row, i);
                     recordConsumer.endField(fieldName, i);
                 } else {
-                    if (!fieldWriters[i].isNullable()) {
+                    if (!isNullable[i]) {
                         throw new IllegalArgumentException(
-                                format(
-                                        "Parquet does not support null values 
in non-nullable fields. Field name : %s expected not null but found null",
-                                        fieldNames[i]));
+                                String.format(
+                                        "Field '%s' expected not null but 
found null value. A possible cause is that the "
+                                                + "table used %s or %s 
merge-engine and the aggregate function produced "
+                                                + "null value when 
retracting.",
+                                        fieldNames[i],
+                                        CoreOptions.MergeEngine.PARTIAL_UPDATE,
+                                        CoreOptions.MergeEngine.AGGREGATE));
                     }
                 }
             }
@@ -631,17 +576,15 @@ public class ParquetRowDataWriter {
         }
     }
 
-    private class VariantWriter extends FieldWriter {
+    private class VariantWriter implements FieldWriter {
 
         @Nullable private final VariantSchema variantSchema;
         @Nullable private final RowWriter shreddedVariantWriter;
 
-        public VariantWriter(
-                boolean isNullable, GroupType groupType, @Nullable RowType 
shreddingSchema) {
-            super(isNullable);
+        public VariantWriter(GroupType groupType, @Nullable RowType 
shreddingSchema) {
             if (shreddingSchema != null) {
                 variantSchema = 
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
-                shreddedVariantWriter = new RowWriter(shreddingSchema, 
groupType, isNullable);
+                shreddedVariantWriter = new RowWriter(shreddingSchema, 
groupType);
             } else {
                 variantSchema = null;
                 shreddedVariantWriter = null;
@@ -695,18 +638,14 @@ public class ParquetRowDataWriter {
         return Binary.fromConstantByteBuffer(buf);
     }
 
-    private FieldWriter createDecimalWriter(int precision, int scale, boolean 
isNullable) {
+    private FieldWriter createDecimalWriter(int precision, int scale) {
         checkArgument(
                 precision <= DecimalType.MAX_PRECISION,
                 "Decimal precision %s exceeds max precision %s",
                 precision,
                 DecimalType.MAX_PRECISION);
 
-        class Int32Writer extends FieldWriter {
-
-            public Int32Writer(boolean isNullable) {
-                super(isNullable);
-            }
+        class Int32Writer implements FieldWriter {
 
             @Override
             public void write(InternalArray arrayData, int ordinal) {
@@ -726,11 +665,7 @@ public class ParquetRowDataWriter {
             }
         }
 
-        class Int64Writer extends FieldWriter {
-
-            public Int64Writer(boolean isNullable) {
-                super(isNullable);
-            }
+        class Int64Writer implements FieldWriter {
 
             @Override
             public void write(InternalArray arrayData, int ordinal) {
@@ -750,12 +685,11 @@ public class ParquetRowDataWriter {
             }
         }
 
-        class UnscaledBytesWriter extends FieldWriter {
+        class UnscaledBytesWriter implements FieldWriter {
             private final int numBytes;
             private final byte[] decimalBuffer;
 
-            private UnscaledBytesWriter(boolean isNullable) {
-                super(isNullable);
+            private UnscaledBytesWriter() {
                 this.numBytes = computeMinBytesForDecimalPrecision(precision);
                 this.decimalBuffer = new byte[numBytes];
             }
@@ -789,11 +723,11 @@ public class ParquetRowDataWriter {
         }
 
         if (ParquetSchemaConverter.is32BitDecimal(precision)) {
-            return new Int32Writer(isNullable);
+            return new Int32Writer();
         } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
-            return new Int64Writer(isNullable);
+            return new Int64Writer();
         } else {
-            return new UnscaledBytesWriter(isNullable);
+            return new UnscaledBytesWriter();
         }
     }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
index 6487436431..2a0c7ab6c5 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.orc.writer.RowDataVectorizer;
 import org.apache.paimon.format.orc.writer.Vectorizer;
 import org.apache.paimon.fs.local.LocalFileIO.LocalPositionOutputStream;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 
 import org.apache.hadoop.fs.Path;
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -49,7 +50,9 @@ class OrcWriterFactoryTest {
                 new TestOrcWriterFactory(
                         new RowDataVectorizer(
                                 
TypeDescription.fromString("struct<_col0:string,_col1:int>"),
-                                new DataType[] {DataTypes.STRING(), 
DataTypes.INT()},
+                                Arrays.asList(
+                                        new DataField(0, "f0", 
DataTypes.STRING()),
+                                        new DataField(1, "f1", 
DataTypes.INT())),
                                 true),
                         memoryManager);
         factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4");


Reply via email to