This is an automated email from the ASF dual-hosted git repository.
jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 979a6703a0 [BUG] Fixed avro format support for storing null (#8424)
979a6703a0 is described below
commit 979a6703a00d96c4cd3cddb53b57008f6fb6f128
Author: Tu-maimes <[email protected]>
AuthorDate: Fri Jan 3 09:04:01 2025 +0800
[BUG] Fixed avro format support for storing null (#8424)
Co-authored-by: Tu-maimes <[email protected]>
---
.../seatunnel/format/avro/AvroToRowConverter.java | 4 ++
.../SeaTunnelRowTypeToAvroSchemaConverter.java | 43 ++++++++++++-----
.../format/avro/AvroSerializationSchemaTest.java | 55 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
index e80b78ee83..84b1063600 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -38,6 +38,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class AvroToRowConverter implements Serializable {
@@ -82,6 +83,9 @@ public class AvroToRowConverter implements Serializable {
}
private Object convertField(SeaTunnelDataType<?> dataType, Object val) {
+ if (Objects.isNull(val)) {
+ return null;
+ }
switch (dataType.getSqlType()) {
case STRING:
return val.toString();
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
index 0a990f3fc6..56a65d3e53 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/SeaTunnelRowTypeToAvroSchemaConverter.java
@@ -58,28 +58,39 @@ public class SeaTunnelRowTypeToAvroSchemaConverter {
switch (seaTunnelDataType.getSqlType()) {
case STRING:
- return Schema.create(Schema.Type.STRING);
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.STRING));
case BYTES:
- return Schema.create(Schema.Type.BYTES);
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.BYTES));
case TINYINT:
case SMALLINT:
case INT:
- return Schema.create(Schema.Type.INT);
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.INT));
case BIGINT:
- return Schema.create(Schema.Type.LONG);
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.LONG));
case FLOAT:
- return Schema.create(Schema.Type.FLOAT);
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.FLOAT));
case DOUBLE:
- return Schema.create(Schema.Type.DOUBLE);
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.DOUBLE));
case BOOLEAN:
- return Schema.create(Schema.Type.BOOLEAN);
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
Schema.create(Schema.Type.BOOLEAN));
case MAP:
SeaTunnelDataType<?> valueType = ((MapType<?, ?>)
seaTunnelDataType).getValueType();
- return
Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType));
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+
Schema.createMap(seaTunnelDataType2AvroDataType(fieldName, valueType)));
case ARRAY:
SeaTunnelDataType<?> elementType =
((ArrayType<?, ?>) seaTunnelDataType).getElementType();
- return
Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType));
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+
Schema.createArray(seaTunnelDataType2AvroDataType(fieldName, elementType)));
case ROW:
SeaTunnelDataType<?>[] fieldTypes =
((SeaTunnelRowType) seaTunnelDataType).getFieldTypes();
@@ -93,12 +104,18 @@ public class SeaTunnelRowTypeToAvroSchemaConverter {
int precision = ((DecimalType)
seaTunnelDataType).getPrecision();
int scale = ((DecimalType) seaTunnelDataType).getScale();
LogicalTypes.Decimal decimal = LogicalTypes.decimal(precision,
scale);
- return decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ decimal.addToSchema(Schema.create(Schema.Type.BYTES)));
case TIMESTAMP:
- return LogicalTypes.localTimestampMillis()
- .addToSchema(Schema.create(Schema.Type.LONG));
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ LogicalTypes.localTimestampMillis()
+ .addToSchema(Schema.create(Schema.Type.LONG)));
case DATE:
- return
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+ return Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)));
case NULL:
return Schema.create(Schema.Type.NULL);
default:
diff --git
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
index 42b8029f16..52ba7d76e6 100644
---
a/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-avro/src/test/java/org/apache/seatunnel/format/avro/AvroSerializationSchemaTest.java
@@ -177,4 +177,59 @@ class AvroSerializationSchemaTest {
LocalDateTime localDateTime1 = (LocalDateTime) subRow.getField(13);
Assertions.assertEquals(localDateTime1.compareTo(localDateTime), 0);
}
+
+ private SeaTunnelRow buildSeaTunnelRowValueNull() {
+ SeaTunnelRow subSeaTunnelRow = new SeaTunnelRow(14);
+ subSeaTunnelRow.setField(0, null);
+ subSeaTunnelRow.setField(1, null);
+ subSeaTunnelRow.setField(2, null);
+ subSeaTunnelRow.setField(3, null);
+ subSeaTunnelRow.setField(4, null);
+ subSeaTunnelRow.setField(5, null);
+ subSeaTunnelRow.setField(6, null);
+ subSeaTunnelRow.setField(7, null);
+ subSeaTunnelRow.setField(8, null);
+ subSeaTunnelRow.setField(9, null);
+ subSeaTunnelRow.setField(10, null);
+ subSeaTunnelRow.setField(11, null);
+ subSeaTunnelRow.setField(12, null);
+ subSeaTunnelRow.setField(13, null);
+
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(15);
+ seaTunnelRow.setField(0, null);
+ seaTunnelRow.setField(1, null);
+ seaTunnelRow.setField(2, null);
+ seaTunnelRow.setField(3, null);
+ seaTunnelRow.setField(4, null);
+ seaTunnelRow.setField(5, null);
+ seaTunnelRow.setField(6, null);
+ seaTunnelRow.setField(7, null);
+ seaTunnelRow.setField(8, null);
+ seaTunnelRow.setField(9, null);
+ seaTunnelRow.setField(10, null);
+ seaTunnelRow.setField(11, null);
+ seaTunnelRow.setField(12, null);
+ seaTunnelRow.setField(13, null);
+ seaTunnelRow.setField(14, subSeaTunnelRow);
+ return seaTunnelRow;
+ }
+
+ @Test
+ public void testSerializationValueNull() throws IOException {
+ SeaTunnelRowType rowType = buildSeaTunnelRowType();
+ CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("", "",
"", "test", rowType);
+ SeaTunnelRow seaTunnelRow = buildSeaTunnelRowValueNull();
+ AvroSerializationSchema serializationSchema = new
AvroSerializationSchema(rowType);
+ byte[] bytes = serializationSchema.serialize(seaTunnelRow);
+ AvroDeserializationSchema deserializationSchema =
+ new AvroDeserializationSchema(catalogTable);
+ SeaTunnelRow deserialize = deserializationSchema.deserialize(bytes);
+ String[] strArray1 = (String[]) seaTunnelRow.getField(1);
+ String[] strArray2 = (String[]) deserialize.getField(1);
+ Assertions.assertArrayEquals(strArray1, strArray2);
+ SeaTunnelRow subRow = (SeaTunnelRow) deserialize.getField(14);
+ Assertions.assertEquals(subRow.getField(9), null);
+ Assertions.assertEquals(subRow.getField(12), null);
+ Assertions.assertEquals(subRow.getField(13), null);
+ }
}