This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bf03af26d6 NIFI-14897 Fixed Avro FIXED serialization to use
GenericFixed (#10266)
bf03af26d6 is described below
commit bf03af26d65d4be9c6d7f1302acd71afdb51175d
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Sep 5 04:49:41 2025 +0200
NIFI-14897 Fixed Avro FIXED serialization to use GenericFixed (#10266)
Signed-off-by: David Handermann <[email protected]>
---
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 58 ++++++++---
.../org/apache/nifi/avro/TestAvroTypeUtil.java | 5 +-
.../nifi-record-serialization-services/pom.xml | 2 +
.../org/apache/nifi/avro/TestWriteAvroResult.java | 116 +++++++++++++++++++++
.../test/resources/avro/decimals-bytes-fixed.avsc | 28 +++++
.../src/test/resources/avro/fixed16.avsc | 16 +++
6 files changed, 211 insertions(+), 14 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 48c93d6cfa..41bff6a99a 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -775,32 +775,66 @@ public class AvroTypeUtil {
final BigDecimal decimal = rawDecimal.scale() ==
desiredScale
? rawDecimal : rawDecimal.setScale(desiredScale,
RoundingMode.HALF_UP);
return fieldSchema.getType() == Type.BYTES
- ? new Conversions.DecimalConversion().toBytes(decimal,
fieldSchema, logicalType) //return GenericByte
- : new Conversions.DecimalConversion().toFixed(decimal,
fieldSchema, logicalType); //return GenericFixed
+ ? new Conversions.DecimalConversion().toBytes(decimal,
fieldSchema, logicalType)
+ : new Conversions.DecimalConversion().toFixed(decimal,
fieldSchema, logicalType);
}
if (rawValue instanceof byte[]) {
- return ByteBuffer.wrap((byte[]) rawValue);
+ final byte[] bytes = (byte[]) rawValue;
+ if (fieldSchema.getType() == Type.FIXED) {
+ final int expectedSize = fieldSchema.getFixedSize();
+ if (bytes.length != expectedSize) {
+ throw new IllegalTypeConversionException("Cannot
convert byte[] of length " + bytes.length +
+ " to FIXED(" + expectedSize + ") for field '"
+ fieldName + "'");
+ }
+ return new GenericData.Fixed(fieldSchema, bytes);
+ }
+ return ByteBuffer.wrap(bytes);
}
if (rawValue instanceof String) {
- return ByteBuffer.wrap(((String)
rawValue).getBytes(charset));
+ final byte[] bytes = ((String) rawValue).getBytes(charset);
+ if (fieldSchema.getType() == Type.FIXED) {
+ final int expectedSize = fieldSchema.getFixedSize();
+ if (bytes.length != expectedSize) {
+ throw new IllegalTypeConversionException("Cannot
convert String bytes of length " + bytes.length +
+ " to FIXED(" + expectedSize + ") for field '"
+ fieldName + "'");
+ }
+ return new GenericData.Fixed(fieldSchema, bytes);
+ }
+ return ByteBuffer.wrap(bytes);
}
if (rawValue instanceof Object[]) {
- if (fieldSchema.getType() == Type.FIXED &&
"INT96".equals(fieldSchema.getName())) {
- Object[] rawObjects = (Object[]) rawValue;
- byte[] rawBytes = new byte[rawObjects.length];
- for (int elementIndex = 0; elementIndex <
rawObjects.length; elementIndex++) {
- rawBytes[elementIndex] = (Byte)
rawObjects[elementIndex];
+ final Object[] rawObjects = (Object[]) rawValue;
+ final byte[] bytes = new byte[rawObjects.length];
+ for (int elementIndex = 0; elementIndex <
rawObjects.length; elementIndex++) {
+ final Object o = rawObjects[elementIndex];
+ if (!(o instanceof Byte)) {
+ throw new IllegalTypeConversionException("Cannot
convert non-Byte element in Object[] to binary for field '" + fieldName + "'");
}
+ bytes[elementIndex] = (Byte) o;
+ }
- return new GenericData.Fixed(fieldSchema, rawBytes);
- } else {
- return convertByteArray((Object[]) rawValue);
+ if (fieldSchema.getType() == Type.FIXED) {
+ final int expectedSize = fieldSchema.getFixedSize();
+ if (bytes.length != expectedSize) {
+ throw new IllegalTypeConversionException("Cannot
convert Object[] of length " + bytes.length +
+ " to FIXED(" + expectedSize + ") for field '"
+ fieldName + "'");
+ }
+ return new GenericData.Fixed(fieldSchema, bytes);
}
+ return ByteBuffer.wrap(bytes);
}
try {
if (rawValue instanceof Blob blob) {
final InputStream binaryStream =
blob.getBinaryStream();
final byte[] bytes = binaryStream.readAllBytes();
+ if (fieldSchema.getType() == Type.FIXED) {
+ final int expectedSize =
fieldSchema.getFixedSize();
+ if (bytes.length != expectedSize) {
+ throw new
IllegalTypeConversionException("Cannot convert Blob of length " + bytes.length +
+ " to FIXED(" + expectedSize + ") for field
'" + fieldName + "'");
+ }
+ return new GenericData.Fixed(fieldSchema, bytes);
+ }
return ByteBuffer.wrap(bytes);
} else {
throw new IllegalTypeConversionException("Cannot
convert value " + rawValue + " of type " + rawValue.getClass() + " to a
ByteBuffer");
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 8d8e88ce91..3c12109ab0 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -804,8 +804,9 @@ public class TestAvroTypeUtil {
final Schema fixedSchema = Schema.createFixed("blob", "blob",
NAMESPACE, bytes.length);
final Object converted = AvroTypeUtil.convertToAvroObject(blob,
fixedSchema, StandardCharsets.UTF_8);
- assertInstanceOf(ByteBuffer.class, converted);
- assertEquals(inputBuffer, converted);
+ assertInstanceOf(GenericFixed.class, converted);
+ final GenericFixed genericFixed = (GenericFixed) converted;
+ assertEquals(inputBuffer, ByteBuffer.wrap(genericFixed.bytes()));
}
@Test
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index b69f9b67cc..24ca439fdd 100755
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -190,6 +190,8 @@
<exclude>src/test/resources/avro/avro_schemaless_decimal.avsc</exclude>
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
<exclude>src/test/resources/avro/decimals.avsc</exclude>
+
<exclude>src/test/resources/avro/decimals-bytes-fixed.avsc</exclude>
+ <exclude>src/test/resources/avro/fixed16.avsc</exclude>
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 385b9d704b..59dfca7341 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -20,8 +20,12 @@ package org.apache.nifi.avro;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -34,6 +38,7 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
+import
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
@@ -52,16 +57,19 @@ import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public abstract class TestWriteAvroResult {
@@ -323,6 +331,114 @@ public abstract class TestWriteAvroResult {
}
}
+ @Test
+ public void testFixedFieldWritesAsGenericFixed() throws IOException {
+ // Avro schema with a single fixed(16) field
+ final Schema schema = new Schema.Parser().parse(new
File("src/test/resources/avro/fixed16.avsc"));
+
+ // NiFi record schema: represent FIXED as an array of BYTEs
+ final List<RecordField> fields = Collections.singletonList(
+ new RecordField("fixed",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ // 16-byte value (e.g., UUID bytes). Use NiFi's typical byte[] ->
Object[]
+ // conversion path
+ final byte[] bytes = new byte[16];
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (byte) i;
+ }
+ final Map<String, Object> values = new HashMap<>();
+ values.put("fixed", AvroTypeUtil.convertByteArray(bytes));
+ final Record record = new MapRecord(recordSchema, values);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (final RecordSetWriter writer = new
WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec())) {
+ writer.write(RecordSet.of(record.getSchema(), record));
+ }
+
+ // Read back and verify Fixed value is written as GenericFixed with
matching
+ // bytes
+ final byte[] written = out.toByteArray();
+ try (final DataFileStream<GenericRecord> dfs = new
DataFileStream<>(new ByteArrayInputStream(written), new
GenericDatumReader<>())) {
+ final GenericRecord rec = dfs.next();
+ assertNotNull(rec);
+ final Object fixedObj = rec.get("fixed");
+ assertInstanceOf(GenericFixed.class, fixedObj);
+ final byte[] actual = ((GenericFixed) fixedObj).bytes();
+ assertArrayEquals(bytes, actual);
+ }
+ }
+
+ @Test
+ public void testFixedFieldWrongLength() throws IOException {
+ final Schema schema = new Schema.Parser().parse(new
File("src/test/resources/avro/fixed16.avsc"));
+
+ final List<RecordField> fields = Collections.singletonList(
+ new RecordField("fixed",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ // Wrong length: 15 bytes instead of required 16
+ final byte[] bytes = new byte[15];
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (byte) i;
+ }
+ final Map<String, Object> values = new HashMap<>();
+ values.put("fixed", AvroTypeUtil.convertByteArray(bytes));
+ final Record record = new MapRecord(recordSchema, values);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (final RecordSetWriter writer = new
WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec())) {
+ assertThrows(IllegalTypeConversionException.class, () ->
writer.write(RecordSet.of(record.getSchema(), record)));
+ }
+ }
+
+ @Test
+ public void testDecimalBytesAndFixedRoundTrip() throws IOException {
+ final Schema schema = new Schema.Parser().parse(new
File("src/test/resources/avro/decimals-bytes-fixed.avsc"));
+
+ // Both fields are DECIMAL(9,2) in NiFi's record schema
+ final List<RecordField> fields = Arrays.asList(
+ new RecordField("dec_bytes",
RecordFieldType.DECIMAL.getDecimalDataType(9, 2)),
+ new RecordField("dec_fixed",
RecordFieldType.DECIMAL.getDecimalDataType(9, 2)));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ final BigDecimal expected = new BigDecimal("12345.67");
+ final Map<String, Object> values = new HashMap<>();
+ values.put("dec_bytes", expected);
+ values.put("dec_fixed", expected);
+ final Record record = new MapRecord(recordSchema, values);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (final RecordSetWriter writer = new
WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec())) {
+ writer.write(RecordSet.of(record.getSchema(), record));
+ }
+
+ // Read back the values and convert from logical decimal
representations
+ final byte[] written = out.toByteArray();
+ try (final DataFileStream<GenericRecord> dfs = new
DataFileStream<>(new ByteArrayInputStream(written), new
GenericDatumReader<>())) {
+ final GenericRecord rec = dfs.next();
+ assertNotNull(rec);
+
+ // BYTES logical decimal → ByteBuffer
+ final Schema bytesFieldSchema =
schema.getField("dec_bytes").schema();
+ final LogicalType bytesLogicalType =
bytesFieldSchema.getLogicalType();
+ final Object bytesObj = rec.get("dec_bytes");
+ assertInstanceOf(ByteBuffer.class, bytesObj);
+ final BigDecimal fromBytes = new
Conversions.DecimalConversion().fromBytes((ByteBuffer) bytesObj,
bytesFieldSchema, bytesLogicalType);
+
+ // FIXED logical decimal → GenericFixed
+ final Schema fixedFieldSchema =
schema.getField("dec_fixed").schema();
+ final LogicalType fixedLogicalType =
fixedFieldSchema.getLogicalType();
+ final Object fixedObj = rec.get("dec_fixed");
+ assertInstanceOf(GenericFixed.class, fixedObj);
+ final BigDecimal fromFixed = new
Conversions.DecimalConversion().fromFixed((GenericFixed) fixedObj,
fixedFieldSchema, fixedLogicalType);
+
+ assertEquals(expected, fromBytes);
+ assertEquals(expected, fromFixed);
+ }
+ }
+
protected void assertMatch(final Record record, final GenericRecord
avroRecord) {
for (final String fieldName : record.getSchema().getFieldNames()) {
Object avroValue = avroRecord.get(fieldName);
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals-bytes-fixed.avsc
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals-bytes-fixed.avsc
new file mode 100644
index 0000000000..8ce5b0b8f4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals-bytes-fixed.avsc
@@ -0,0 +1,28 @@
+{
+ "namespace": "nifi",
+ "name": "decimal_types",
+ "type": "record",
+ "fields": [
+ {
+ "name": "dec_bytes",
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 9,
+ "scale": 2
+ }
+ },
+ {
+ "name": "dec_fixed",
+ "type": {
+ "type": "fixed",
+ "name": "decfixed",
+ "size": 4,
+ "logicalType": "decimal",
+ "precision": 9,
+ "scale": 2
+ }
+ }
+ ]
+}
+
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/fixed16.avsc
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/fixed16.avsc
new file mode 100644
index 0000000000..396eee71d7
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/fixed16.avsc
@@ -0,0 +1,16 @@
+{
+ "namespace": "nifi",
+ "name": "data_types",
+ "type": "record",
+ "fields": [
+ {
+ "name": "fixed",
+ "type": {
+ "type": "fixed",
+ "name": "fixed16",
+ "size": 16
+ }
+ }
+ ]
+}
+