This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2d28093782 [format] Format writer check null value for non-null field
and throw useful message (#6213)
2d28093782 is described below
commit 2d28093782e444e3f0d26c4b788ec292cdf6f947
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 9 13:31:22 2025 +0800
[format] Format writer check null value for non-null field and throw useful
message (#6213)
---
.../paimon/arrow/writer/ArrowFieldWriter.java | 12 +-
.../apache/paimon/format/FormatReadWriteTest.java | 23 +++
.../paimon/flink/ContinuousFileStoreITCase.java | 8 +-
.../paimon/format/avro/AvroRowDatumWriter.java | 11 +-
.../paimon/format/avro/FieldWriterFactory.java | 27 ++-
.../apache/paimon/format/orc/OrcFileFormat.java | 10 +-
.../format/orc/writer/RowDataVectorizer.java | 36 +++-
.../parquet/writer/ParquetRowDataWriter.java | 202 +++++++--------------
.../paimon/format/orc/OrcWriterFactoryTest.java | 7 +-
9 files changed, 164 insertions(+), 172 deletions(-)
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
index 77c66cc2b7..e785d48bf0 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.arrow.writer;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.columnar.ColumnVector;
@@ -25,8 +26,6 @@ import org.apache.arrow.vector.FieldVector;
import javax.annotation.Nullable;
-import static java.lang.String.format;
-
/** A reusable writer to convert a field into Arrow {@link FieldVector}. */
public abstract class ArrowFieldWriter {
@@ -73,9 +72,12 @@ public abstract class ArrowFieldWriter {
if (getters.isNullAt(pos)) {
if (!isNullable) {
throw new IllegalArgumentException(
- format(
- "Arrow does not support null values in
non-nullable fields. Field name : %s expected not null but found null",
- fieldVector.getName()));
+ String.format(
+ "Field '%s' expected not null but found null
value. A possible cause is that the table used "
+ + "%s or %s merge-engine and the
aggregate function produced null value when retracting.",
+ fieldVector.getName(),
+ CoreOptions.MergeEngine.PARTIAL_UPDATE,
+ CoreOptions.MergeEngine.AGGREGATE));
}
fieldVector.setNull(rowIndex);
} else {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index 14698a47f8..a54796f06b 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -57,7 +57,10 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import static org.apache.paimon.data.BinaryString.fromString;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
/** Test Base class for Format. */
public abstract class FormatReadWriteTest {
@@ -249,6 +252,26 @@ public abstract class FormatReadWriteTest {
assertThat(array.getVariant(1).toJson()).isEqualTo("{\"age\":45,\"city\":\"Beijing\"}");
}
+ @Test
+ public void testWriteNullToNonNullField() {
+ FileFormat format = fileFormat();
+ String identifier = format.getFormatIdentifier();
+ // no agg for these formats now
+ assumeTrue(!identifier.equals("csv") && !identifier.equals("json"));
+
+ FormatWriterFactory factory =
+ format.createWriterFactory(
+ RowType.builder().field("f0",
DataTypes.INT().notNull()).build());
+
+ assertThatThrownBy(() -> write(factory, file, GenericRow.of((Object)
null)))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Field 'f0' expected not null but found null
value. A possible cause is "
+ + "that the table used partial-update
or aggregation merge-engine and the aggregate "
+ + "function produced null value when
retracting."));
+ }
+
protected void write(FormatWriterFactory factory, Path file,
InternalRow... rows)
throws IOException {
FormatWriter writer;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 689afe09fa..317ca24175 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.paimon.SnapshotTest.newSnapshotManager;
@@ -641,7 +642,7 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testAvroRetractNotNullField() {
+ public void testAvroRetractNotNullField() throws ExecutionException,
InterruptedException {
List<Row> input =
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, "A"),
Row.ofKind(RowKind.DELETE, 1, "A"));
@@ -661,7 +662,8 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
() -> sEnv.executeSql("INSERT INTO avro_sink select *
from source").await())
.satisfies(
anyCauseMatches(
- RuntimeException.class,
- "Caught NullPointerException, the possible
reason is you have set following options together"));
+ IllegalArgumentException.class,
+ "Field 'a' expected not null but found null
value. A possible cause is that "
+ + "the table used partial-update or
aggregation merge-engine and the aggregate function produced null value when
retracting."));
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
index d302451625..c2bd81d003 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
@@ -56,15 +56,6 @@ public class AvroRowDatumWriter implements
DatumWriter<InternalRow> {
// top Row is a UNION type
out.writeIndex(1);
}
- try {
- this.writer.writeRow(datum, out);
- } catch (NullPointerException npe) {
- throw new RuntimeException(
- "Caught NullPointerException, the possible reason is you
have set following options together:\n"
- + " 1. file.format = avro;\n"
- + " 2. merge-function =
aggregation/partial-update;\n"
- + " 3. some fields are not null.",
- npe);
- }
+ this.writer.writeRow(datum, out);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
index a8b659a749..e108959ffe 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.avro;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
@@ -240,14 +241,20 @@ public class FieldWriterFactory implements
AvroSchemaVisitor<FieldWriter> {
public class RowWriter implements FieldWriter {
private final FieldWriter[] fieldWriters;
+ private final String[] fieldNames;
+ private final boolean[] isNullable;
private RowWriter(Schema schema, List<DataField> fields) {
List<Schema.Field> schemaFields = schema.getFields();
this.fieldWriters = new FieldWriter[schemaFields.size()];
+ this.fieldNames = new String[schemaFields.size()];
+ this.isNullable = new boolean[schemaFields.size()];
for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize;
i++) {
Schema.Field field = schemaFields.get(i);
DataType type = fields.get(i).type();
fieldWriters[i] = visit(field.schema(), type);
+ fieldNames[i] = field.name();
+ isNullable[i] = type.isNullable();
}
}
@@ -258,8 +265,24 @@ public class FieldWriterFactory implements
AvroSchemaVisitor<FieldWriter> {
}
public void writeRow(InternalRow row, Encoder encoder) throws
IOException {
- for (int i = 0; i < fieldWriters.length; i += 1) {
- fieldWriters[i].write(row, i, encoder);
+ int i = 0;
+ try {
+ for (; i < fieldWriters.length; i += 1) {
+ fieldWriters[i].write(row, i, encoder);
+ }
+ } catch (NullPointerException npe) {
+ if (!isNullable[i] && row.isNullAt(i)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field '%s' expected not null but found
null value. A possible cause is that the "
+ + "table used %s or %s
merge-engine and the aggregate function produced "
+ + "null value when retracting.",
+ fieldNames[i],
+ CoreOptions.MergeEngine.PARTIAL_UPDATE,
+ CoreOptions.MergeEngine.AGGREGATE));
+ } else {
+ throw npe;
+ }
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 99e7a9efdd..76286b8cce 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -57,7 +57,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static
org.apache.paimon.format.OrcOptions.ORC_TIMESTAMP_LTZ_LEGACY_TYPE;
-import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
/** Orc {@link FileFormat}. */
@ThreadSafe
@@ -144,12 +143,11 @@ public class OrcFileFormat extends FileFormat {
*/
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
- DataType refinedType = refineDataType(type);
- DataType[] orcTypes = getFieldTypes(refinedType).toArray(new
DataType[0]);
-
- TypeDescription typeDescription =
OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
+ RowType refinedType = (RowType) refineDataType(type);
+ TypeDescription typeDescription =
OrcTypeUtil.convertToOrcSchema(refinedType);
Vectorizer<InternalRow> vectorizer =
- new RowDataVectorizer(typeDescription, orcTypes,
legacyTimestampLtzType);
+ new RowDataVectorizer(
+ typeDescription, refinedType.getFields(),
legacyTimestampLtzType);
return new OrcWriterFactory(vectorizer, orcProperties, writerConf,
writeBatchSize);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index 51c7acf170..5cb37f191a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -18,30 +18,36 @@
package org.apache.paimon.format.orc.writer;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
/** A {@link Vectorizer} of {@link InternalRow} type element. */
public class RowDataVectorizer extends Vectorizer<InternalRow> {
- private final List<FieldWriter> fieldWriters;
+ private final FieldWriter[] fieldWriters;
+ private final String[] fieldNames;
+ private final boolean[] isNullable;
public RowDataVectorizer(
- TypeDescription schema, DataType[] fieldTypes, boolean
legacyTimestampLtzType) {
+ TypeDescription schema, List<DataField> dataFields, boolean
legacyTimestampLtzType) {
super(schema);
FieldWriterFactory fieldWriterFactory = new
FieldWriterFactory(legacyTimestampLtzType);
- this.fieldWriters =
- Arrays.stream(fieldTypes)
- .map(t -> t.accept(fieldWriterFactory))
- .collect(Collectors.toList());
+ this.fieldWriters = new FieldWriter[dataFields.size()];
+ this.fieldNames = new String[dataFields.size()];
+ this.isNullable = new boolean[dataFields.size()];
+ for (int i = 0; i < dataFields.size(); i++) {
+ DataField field = dataFields.get(i);
+ fieldWriters[i] = field.type().accept(fieldWriterFactory);
+ fieldNames[i] = field.name();
+ isNullable[i] = field.type().isNullable();
+ }
}
@Override
@@ -50,10 +56,20 @@ public class RowDataVectorizer extends
Vectorizer<InternalRow> {
for (int i = 0; i < row.getFieldCount(); ++i) {
ColumnVector fieldColumn = batch.cols[i];
if (row.isNullAt(i)) {
+ if (!isNullable[i]) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Field '%s' expected not null but found
null value. A possible cause is that the "
+ + "table used %s or %s
merge-engine and the aggregate function produced "
+ + "null value when retracting.",
+ fieldNames[i],
+ CoreOptions.MergeEngine.PARTIAL_UPDATE,
+ CoreOptions.MergeEngine.AGGREGATE));
+ }
fieldColumn.noNulls = false;
fieldColumn.isNull[rowId] = true;
} else {
- fieldWriters.get(i).write(rowId, fieldColumn, row, i);
+ fieldWriters[i].write(rowId, fieldColumn, row, i);
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
index fe62dc2174..c00734b024 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format.parquet.writer;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -55,7 +56,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import static java.lang.String.format;
import static
org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -75,7 +75,7 @@ public class ParquetRowDataWriter {
RecordConsumer recordConsumer, RowType rowType, GroupType schema,
Configuration conf) {
this.conf = conf;
this.recordConsumer = recordConsumer;
- this.rowWriter = new RowWriter(rowType, schema, false);
+ this.rowWriter = new RowWriter(rowType, schema);
}
/**
@@ -94,37 +94,35 @@ public class ParquetRowDataWriter {
switch (t.getTypeRoot()) {
case CHAR:
case VARCHAR:
- return new StringWriter(t.isNullable());
+ return new StringWriter();
case BOOLEAN:
- return new BooleanWriter(t.isNullable());
+ return new BooleanWriter();
case BINARY:
case VARBINARY:
- return new BinaryWriter(t.isNullable());
+ return new BinaryWriter();
case DECIMAL:
DecimalType decimalType = (DecimalType) t;
- return createDecimalWriter(
- decimalType.getPrecision(),
decimalType.getScale(), t.isNullable());
+ return createDecimalWriter(decimalType.getPrecision(),
decimalType.getScale());
case TINYINT:
- return new ByteWriter(t.isNullable());
+ return new ByteWriter();
case SMALLINT:
- return new ShortWriter(t.isNullable());
+ return new ShortWriter();
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case INTEGER:
- return new IntWriter(t.isNullable());
+ return new IntWriter();
case BIGINT:
- return new LongWriter(t.isNullable());
+ return new LongWriter();
case FLOAT:
- return new FloatWriter(t.isNullable());
+ return new FloatWriter();
case DOUBLE:
- return new DoubleWriter(t.isNullable());
+ return new DoubleWriter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) t;
- return createTimestampWriter(timestampType.getPrecision(),
t.isNullable());
+ return createTimestampWriter(timestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) t;
- return createTimestampWriter(
- localZonedTimestampType.getPrecision(),
t.isNullable());
+ return
createTimestampWriter(localZonedTimestampType.getPrecision());
default:
throw new UnsupportedOperationException("Unsupported type:
" + type);
}
@@ -134,26 +132,19 @@ public class ParquetRowDataWriter {
if (t instanceof ArrayType
&& annotation instanceof
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
- return new ArrayWriter(((ArrayType) t).getElementType(),
groupType, t.isNullable());
+ return new ArrayWriter(((ArrayType) t).getElementType(),
groupType);
} else if (t instanceof MapType
&& annotation instanceof
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return new MapWriter(
- ((MapType) t).getKeyType(),
- ((MapType) t).getValueType(),
- groupType,
- t.isNullable());
+ ((MapType) t).getKeyType(), ((MapType)
t).getValueType(), groupType);
} else if (t instanceof MultisetType
&& annotation instanceof
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return new MapWriter(
- ((MultisetType) t).getElementType(),
- new IntType(false),
- groupType,
- t.isNullable());
+ ((MultisetType) t).getElementType(), new
IntType(false), groupType);
} else if (t instanceof RowType && type instanceof GroupType) {
- return new RowWriter((RowType) t, groupType, t.isNullable());
+ return new RowWriter((RowType) t, groupType);
} else if (t instanceof VariantType && type instanceof GroupType) {
return new VariantWriter(
- t.isNullable(),
groupType,
VariantUtils.extractShreddingSchemaFromConf(conf,
type.getName()));
} else {
@@ -162,38 +153,24 @@ public class ParquetRowDataWriter {
}
}
- private FieldWriter createTimestampWriter(int precision, boolean
isNullable) {
+ private FieldWriter createTimestampWriter(int precision) {
if (precision <= 3) {
- return new TimestampMillsWriter(precision, isNullable);
+ return new TimestampMillsWriter(precision);
} else if (precision > 6) {
- return new TimestampInt96Writer(precision, isNullable);
+ return new TimestampInt96Writer(precision);
} else {
- return new TimestampMicrosWriter(precision, isNullable);
+ return new TimestampMicrosWriter(precision);
}
}
- private abstract static class FieldWriter {
-
- private final boolean isNullable;
-
- public FieldWriter(boolean isNullable) {
- this.isNullable = isNullable;
- }
-
- abstract void write(InternalRow row, int ordinal);
+ private interface FieldWriter {
- abstract void write(InternalArray arrayData, int ordinal);
+ void write(InternalRow row, int ordinal);
- public boolean isNullable() {
- return isNullable;
- }
+ void write(InternalArray arrayData, int ordinal);
}
- private class BooleanWriter extends FieldWriter {
-
- public BooleanWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class BooleanWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -210,11 +187,7 @@ public class ParquetRowDataWriter {
}
}
- private class ByteWriter extends FieldWriter {
-
- public ByteWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class ByteWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -231,11 +204,7 @@ public class ParquetRowDataWriter {
}
}
- private class ShortWriter extends FieldWriter {
-
- public ShortWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class ShortWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -252,11 +221,7 @@ public class ParquetRowDataWriter {
}
}
- private class LongWriter extends FieldWriter {
-
- public LongWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class LongWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -273,11 +238,7 @@ public class ParquetRowDataWriter {
}
}
- private class FloatWriter extends FieldWriter {
-
- public FloatWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class FloatWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -294,11 +255,7 @@ public class ParquetRowDataWriter {
}
}
- private class DoubleWriter extends FieldWriter {
-
- public DoubleWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class DoubleWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -315,11 +272,7 @@ public class ParquetRowDataWriter {
}
}
- private class StringWriter extends FieldWriter {
-
- public StringWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class StringWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -336,11 +289,7 @@ public class ParquetRowDataWriter {
}
}
- private class BinaryWriter extends FieldWriter {
-
- public BinaryWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class BinaryWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -357,11 +306,7 @@ public class ParquetRowDataWriter {
}
}
- private class IntWriter extends FieldWriter {
-
- public IntWriter(boolean isNullable) {
- super(isNullable);
- }
+ private class IntWriter implements FieldWriter {
@Override
public void write(InternalRow row, int ordinal) {
@@ -378,12 +323,11 @@ public class ParquetRowDataWriter {
}
}
- private class TimestampMillsWriter extends FieldWriter {
+ private class TimestampMillsWriter implements FieldWriter {
private final int precision;
- private TimestampMillsWriter(int precision, boolean isNullable) {
- super(isNullable);
+ private TimestampMillsWriter(int precision) {
checkArgument(precision <= 3);
this.precision = precision;
}
@@ -403,12 +347,11 @@ public class ParquetRowDataWriter {
}
}
- private class TimestampMicrosWriter extends FieldWriter {
+ private class TimestampMicrosWriter implements FieldWriter {
private final int precision;
- private TimestampMicrosWriter(int precision, boolean isNullable) {
- super(isNullable);
+ private TimestampMicrosWriter(int precision) {
checkArgument(precision > 3);
checkArgument(precision <= 6);
this.precision = precision;
@@ -429,12 +372,11 @@ public class ParquetRowDataWriter {
}
}
- private class TimestampInt96Writer extends FieldWriter {
+ private class TimestampInt96Writer implements FieldWriter {
private final int precision;
- private TimestampInt96Writer(int precision, boolean isNullable) {
- super(isNullable);
+ private TimestampInt96Writer(int precision) {
checkArgument(precision > 6);
this.precision = precision;
}
@@ -455,7 +397,7 @@ public class ParquetRowDataWriter {
}
/** It writes a map field to parquet, both key and value are nullable. */
- private class MapWriter extends FieldWriter {
+ private class MapWriter implements FieldWriter {
private final String repeatedGroupName;
private final String keyName;
@@ -463,9 +405,7 @@ public class ParquetRowDataWriter {
private final FieldWriter keyWriter;
private final FieldWriter valueWriter;
- private MapWriter(
- DataType keyType, DataType valueType, GroupType groupType,
boolean isNullable) {
- super(isNullable);
+ private MapWriter(DataType keyType, DataType valueType, GroupType
groupType) {
// Get the internal map structure (MAP_KEY_VALUE)
GroupType repeatedType = groupType.getType(0).asGroupType();
this.repeatedGroupName = repeatedType.getName();
@@ -529,14 +469,13 @@ public class ParquetRowDataWriter {
}
/** It writes an array type field to parquet. */
- private class ArrayWriter extends FieldWriter {
+ private class ArrayWriter implements FieldWriter {
private final String elementName;
private final FieldWriter elementWriter;
private final String repeatedGroupName;
- private ArrayWriter(DataType t, GroupType groupType, boolean
isNullable) {
- super(isNullable);
+ private ArrayWriter(DataType t, GroupType groupType) {
// Get the internal array structure
GroupType repeatedType = groupType.getType(0).asGroupType();
this.repeatedGroupName = repeatedType.getName();
@@ -580,17 +519,19 @@ public class ParquetRowDataWriter {
}
/** It writes a row type field to parquet. */
- private class RowWriter extends FieldWriter {
+ private class RowWriter implements FieldWriter {
private final FieldWriter[] fieldWriters;
private final String[] fieldNames;
+ private final boolean[] isNullable;
- public RowWriter(RowType rowType, GroupType groupType, boolean
isNullable) {
- super(isNullable);
+ public RowWriter(RowType rowType, GroupType groupType) {
this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
List<DataType> fieldTypes = rowType.getFieldTypes();
this.fieldWriters = new FieldWriter[rowType.getFieldCount()];
+ this.isNullable = new boolean[rowType.getFieldCount()];
for (int i = 0; i < fieldWriters.length; i++) {
fieldWriters[i] = createWriter(fieldTypes.get(i),
groupType.getType(i));
+ isNullable[i] = fieldTypes.get(i).isNullable();
}
}
@@ -604,11 +545,15 @@ public class ParquetRowDataWriter {
writer.write(row, i);
recordConsumer.endField(fieldName, i);
} else {
- if (!fieldWriters[i].isNullable()) {
+ if (!isNullable[i]) {
throw new IllegalArgumentException(
- format(
- "Parquet does not support null values
in non-nullable fields. Field name : %s expected not null but found null",
- fieldNames[i]));
+ String.format(
+ "Field '%s' expected not null but
found null value. A possible cause is that the "
+ + "table used %s or %s
merge-engine and the aggregate function produced "
+ + "null value when
retracting.",
+ fieldNames[i],
+ CoreOptions.MergeEngine.PARTIAL_UPDATE,
+ CoreOptions.MergeEngine.AGGREGATE));
}
}
}
@@ -631,17 +576,15 @@ public class ParquetRowDataWriter {
}
}
- private class VariantWriter extends FieldWriter {
+ private class VariantWriter implements FieldWriter {
@Nullable private final VariantSchema variantSchema;
@Nullable private final RowWriter shreddedVariantWriter;
- public VariantWriter(
- boolean isNullable, GroupType groupType, @Nullable RowType
shreddingSchema) {
- super(isNullable);
+ public VariantWriter(GroupType groupType, @Nullable RowType
shreddingSchema) {
if (shreddingSchema != null) {
variantSchema =
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
- shreddedVariantWriter = new RowWriter(shreddingSchema,
groupType, isNullable);
+ shreddedVariantWriter = new RowWriter(shreddingSchema,
groupType);
} else {
variantSchema = null;
shreddedVariantWriter = null;
@@ -695,18 +638,14 @@ public class ParquetRowDataWriter {
return Binary.fromConstantByteBuffer(buf);
}
- private FieldWriter createDecimalWriter(int precision, int scale, boolean
isNullable) {
+ private FieldWriter createDecimalWriter(int precision, int scale) {
checkArgument(
precision <= DecimalType.MAX_PRECISION,
"Decimal precision %s exceeds max precision %s",
precision,
DecimalType.MAX_PRECISION);
- class Int32Writer extends FieldWriter {
-
- public Int32Writer(boolean isNullable) {
- super(isNullable);
- }
+ class Int32Writer implements FieldWriter {
@Override
public void write(InternalArray arrayData, int ordinal) {
@@ -726,11 +665,7 @@ public class ParquetRowDataWriter {
}
}
- class Int64Writer extends FieldWriter {
-
- public Int64Writer(boolean isNullable) {
- super(isNullable);
- }
+ class Int64Writer implements FieldWriter {
@Override
public void write(InternalArray arrayData, int ordinal) {
@@ -750,12 +685,11 @@ public class ParquetRowDataWriter {
}
}
- class UnscaledBytesWriter extends FieldWriter {
+ class UnscaledBytesWriter implements FieldWriter {
private final int numBytes;
private final byte[] decimalBuffer;
- private UnscaledBytesWriter(boolean isNullable) {
- super(isNullable);
+ private UnscaledBytesWriter() {
this.numBytes = computeMinBytesForDecimalPrecision(precision);
this.decimalBuffer = new byte[numBytes];
}
@@ -789,11 +723,11 @@ public class ParquetRowDataWriter {
}
if (ParquetSchemaConverter.is32BitDecimal(precision)) {
- return new Int32Writer(isNullable);
+ return new Int32Writer();
} else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
- return new Int64Writer(isNullable);
+ return new Int64Writer();
} else {
- return new UnscaledBytesWriter(isNullable);
+ return new UnscaledBytesWriter();
}
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
index 6487436431..2a0c7ab6c5 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.orc.writer.RowDataVectorizer;
import org.apache.paimon.format.orc.writer.Vectorizer;
import org.apache.paimon.fs.local.LocalFileIO.LocalPositionOutputStream;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.hadoop.fs.Path;
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -49,7 +50,9 @@ class OrcWriterFactoryTest {
new TestOrcWriterFactory(
new RowDataVectorizer(
TypeDescription.fromString("struct<_col0:string,_col1:int>"),
- new DataType[] {DataTypes.STRING(),
DataTypes.INT()},
+ Arrays.asList(
+ new DataField(0, "f0",
DataTypes.STRING()),
+ new DataField(1, "f1",
DataTypes.INT())),
true),
memoryManager);
factory.create(new
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4");