This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bf03af26d6 NIFI-14897 Fixed Avro FIXED serialization to use 
GenericFixed (#10266)
bf03af26d6 is described below

commit bf03af26d65d4be9c6d7f1302acd71afdb51175d
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Sep 5 04:49:41 2025 +0200

    NIFI-14897 Fixed Avro FIXED serialization to use GenericFixed (#10266)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    |  58 ++++++++---
 .../org/apache/nifi/avro/TestAvroTypeUtil.java     |   5 +-
 .../nifi-record-serialization-services/pom.xml     |   2 +
 .../org/apache/nifi/avro/TestWriteAvroResult.java  | 116 +++++++++++++++++++++
 .../test/resources/avro/decimals-bytes-fixed.avsc  |  28 +++++
 .../src/test/resources/avro/fixed16.avsc           |  16 +++
 6 files changed, 211 insertions(+), 14 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 48c93d6cfa..41bff6a99a 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -775,32 +775,66 @@ public class AvroTypeUtil {
                     final BigDecimal decimal = rawDecimal.scale() == 
desiredScale
                         ? rawDecimal : rawDecimal.setScale(desiredScale, 
RoundingMode.HALF_UP);
                     return fieldSchema.getType() == Type.BYTES
-                        ? new Conversions.DecimalConversion().toBytes(decimal, 
fieldSchema, logicalType) //return GenericByte
-                        : new Conversions.DecimalConversion().toFixed(decimal, 
fieldSchema, logicalType); //return GenericFixed
+                        ? new Conversions.DecimalConversion().toBytes(decimal, 
fieldSchema, logicalType)
+                        : new Conversions.DecimalConversion().toFixed(decimal, 
fieldSchema, logicalType);
                 }
                 if (rawValue instanceof byte[]) {
-                    return ByteBuffer.wrap((byte[]) rawValue);
+                    final byte[] bytes = (byte[]) rawValue;
+                    if (fieldSchema.getType() == Type.FIXED) {
+                        final int expectedSize = fieldSchema.getFixedSize();
+                        if (bytes.length != expectedSize) {
+                            throw new IllegalTypeConversionException("Cannot 
convert byte[] of length " + bytes.length +
+                                " to FIXED(" + expectedSize + ") for field '" 
+ fieldName + "'");
+                        }
+                        return new GenericData.Fixed(fieldSchema, bytes);
+                    }
+                    return ByteBuffer.wrap(bytes);
                 }
                 if (rawValue instanceof String) {
-                    return ByteBuffer.wrap(((String) 
rawValue).getBytes(charset));
+                    final byte[] bytes = ((String) rawValue).getBytes(charset);
+                    if (fieldSchema.getType() == Type.FIXED) {
+                        final int expectedSize = fieldSchema.getFixedSize();
+                        if (bytes.length != expectedSize) {
+                            throw new IllegalTypeConversionException("Cannot 
convert String bytes of length " + bytes.length +
+                                " to FIXED(" + expectedSize + ") for field '" 
+ fieldName + "'");
+                        }
+                        return new GenericData.Fixed(fieldSchema, bytes);
+                    }
+                    return ByteBuffer.wrap(bytes);
                 }
                 if (rawValue instanceof Object[]) {
-                    if (fieldSchema.getType() == Type.FIXED && 
"INT96".equals(fieldSchema.getName())) {
-                        Object[] rawObjects = (Object[]) rawValue;
-                        byte[] rawBytes = new byte[rawObjects.length];
-                        for (int elementIndex = 0; elementIndex < 
rawObjects.length; elementIndex++) {
-                            rawBytes[elementIndex] = (Byte) 
rawObjects[elementIndex];
+                    final Object[] rawObjects = (Object[]) rawValue;
+                    final byte[] bytes = new byte[rawObjects.length];
+                    for (int elementIndex = 0; elementIndex < 
rawObjects.length; elementIndex++) {
+                        final Object o = rawObjects[elementIndex];
+                        if (!(o instanceof Byte)) {
+                            throw new IllegalTypeConversionException("Cannot 
convert non-Byte element in Object[] to binary for field '" + fieldName + "'");
                         }
+                        bytes[elementIndex] = (Byte) o;
+                    }
 
-                        return new GenericData.Fixed(fieldSchema, rawBytes);
-                    } else {
-                        return convertByteArray((Object[]) rawValue);
+                    if (fieldSchema.getType() == Type.FIXED) {
+                        final int expectedSize = fieldSchema.getFixedSize();
+                        if (bytes.length != expectedSize) {
+                            throw new IllegalTypeConversionException("Cannot 
convert Object[] of length " + bytes.length +
+                                " to FIXED(" + expectedSize + ") for field '" 
+ fieldName + "'");
+                        }
+                        return new GenericData.Fixed(fieldSchema, bytes);
                     }
+                    return ByteBuffer.wrap(bytes);
                 }
                 try {
                     if (rawValue instanceof Blob blob) {
                         final InputStream binaryStream = 
blob.getBinaryStream();
                         final byte[] bytes = binaryStream.readAllBytes();
+                        if (fieldSchema.getType() == Type.FIXED) {
+                            final int expectedSize = 
fieldSchema.getFixedSize();
+                            if (bytes.length != expectedSize) {
+                                throw new 
IllegalTypeConversionException("Cannot convert Blob of length " + bytes.length +
+                                    " to FIXED(" + expectedSize + ") for field 
'" + fieldName + "'");
+                            }
+                            return new GenericData.Fixed(fieldSchema, bytes);
+                        }
                         return ByteBuffer.wrap(bytes);
                     } else {
                         throw new IllegalTypeConversionException("Cannot 
convert value " + rawValue + " of type " + rawValue.getClass() + " to a 
ByteBuffer");
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
index 8d8e88ce91..3c12109ab0 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -804,8 +804,9 @@ public class TestAvroTypeUtil {
 
         final Schema fixedSchema = Schema.createFixed("blob", "blob", 
NAMESPACE, bytes.length);
         final Object converted = AvroTypeUtil.convertToAvroObject(blob, 
fixedSchema, StandardCharsets.UTF_8);
-        assertInstanceOf(ByteBuffer.class, converted);
-        assertEquals(inputBuffer, converted);
+        assertInstanceOf(GenericFixed.class, converted);
+        final GenericFixed genericFixed = (GenericFixed) converted;
+        assertEquals(inputBuffer, ByteBuffer.wrap(genericFixed.bytes()));
     }
 
     @Test
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index b69f9b67cc..24ca439fdd 100755
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -190,6 +190,8 @@
                         
<exclude>src/test/resources/avro/avro_schemaless_decimal.avsc</exclude>
                         
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
                         
<exclude>src/test/resources/avro/decimals.avsc</exclude>
+                        
<exclude>src/test/resources/avro/decimals-bytes-fixed.avsc</exclude>
+                        <exclude>src/test/resources/avro/fixed16.avsc</exclude>
                         
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
                         
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
                         
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 385b9d704b..59dfca7341 100644
--- 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -20,8 +20,12 @@ package org.apache.nifi.avro;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -34,6 +38,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
@@ -52,16 +57,19 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public abstract class TestWriteAvroResult {
 
@@ -323,6 +331,114 @@ public abstract class TestWriteAvroResult {
         }
     }
 
+    @Test
+    public void testFixedFieldWritesAsGenericFixed() throws IOException {
+        // Avro schema with a single fixed(16) field
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/avro/fixed16.avsc"));
+
+        // NiFi record schema: represent FIXED as an array of BYTEs
+        final List<RecordField> fields = Collections.singletonList(
+                new RecordField("fixed", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        // 16-byte value (e.g., UUID bytes). Use NiFi's typical byte[] -> 
Object[]
+        // conversion path
+        final byte[] bytes = new byte[16];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = (byte) i;
+        }
+        final Map<String, Object> values = new HashMap<>();
+        values.put("fixed", AvroTypeUtil.convertByteArray(bytes));
+        final Record record = new MapRecord(recordSchema, values);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try (final RecordSetWriter writer = new 
WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec())) {
+            writer.write(RecordSet.of(record.getSchema(), record));
+        }
+
+        // Read back and verify Fixed value is written as GenericFixed with 
matching
+        // bytes
+        final byte[] written = out.toByteArray();
+        try (final DataFileStream<GenericRecord> dfs = new 
DataFileStream<>(new ByteArrayInputStream(written), new 
GenericDatumReader<>())) {
+            final GenericRecord rec = dfs.next();
+            assertNotNull(rec);
+            final Object fixedObj = rec.get("fixed");
+            assertInstanceOf(GenericFixed.class, fixedObj);
+            final byte[] actual = ((GenericFixed) fixedObj).bytes();
+            assertArrayEquals(bytes, actual);
+        }
+    }
+
+    @Test
+    public void testFixedFieldWrongLength() throws IOException {
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/avro/fixed16.avsc"));
+
+        final List<RecordField> fields = Collections.singletonList(
+                new RecordField("fixed", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        // Wrong length: 15 bytes instead of required 16
+        final byte[] bytes = new byte[15];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = (byte) i;
+        }
+        final Map<String, Object> values = new HashMap<>();
+        values.put("fixed", AvroTypeUtil.convertByteArray(bytes));
+        final Record record = new MapRecord(recordSchema, values);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try (final RecordSetWriter writer = new 
WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec())) {
+            assertThrows(IllegalTypeConversionException.class, () -> 
writer.write(RecordSet.of(record.getSchema(), record)));
+        }
+    }
+
+    @Test
+    public void testDecimalBytesAndFixedRoundTrip() throws IOException {
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/avro/decimals-bytes-fixed.avsc"));
+
+        // Both fields are DECIMAL(9,2) in NiFi's record schema
+        final List<RecordField> fields = Arrays.asList(
+                new RecordField("dec_bytes", 
RecordFieldType.DECIMAL.getDecimalDataType(9, 2)),
+                new RecordField("dec_fixed", 
RecordFieldType.DECIMAL.getDecimalDataType(9, 2)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final BigDecimal expected = new BigDecimal("12345.67");
+        final Map<String, Object> values = new HashMap<>();
+        values.put("dec_bytes", expected);
+        values.put("dec_fixed", expected);
+        final Record record = new MapRecord(recordSchema, values);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try (final RecordSetWriter writer = new 
WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec())) {
+            writer.write(RecordSet.of(record.getSchema(), record));
+        }
+
+        // Read back the values and convert from logical decimal 
representations
+        final byte[] written = out.toByteArray();
+        try (final DataFileStream<GenericRecord> dfs = new 
DataFileStream<>(new ByteArrayInputStream(written), new 
GenericDatumReader<>())) {
+            final GenericRecord rec = dfs.next();
+            assertNotNull(rec);
+
+            // BYTES logical decimal → ByteBuffer
+            final Schema bytesFieldSchema = 
schema.getField("dec_bytes").schema();
+            final LogicalType bytesLogicalType = 
bytesFieldSchema.getLogicalType();
+            final Object bytesObj = rec.get("dec_bytes");
+            assertInstanceOf(ByteBuffer.class, bytesObj);
+            final BigDecimal fromBytes = new 
Conversions.DecimalConversion().fromBytes((ByteBuffer) bytesObj, 
bytesFieldSchema, bytesLogicalType);
+
+            // FIXED logical decimal → GenericFixed
+            final Schema fixedFieldSchema = 
schema.getField("dec_fixed").schema();
+            final LogicalType fixedLogicalType = 
fixedFieldSchema.getLogicalType();
+            final Object fixedObj = rec.get("dec_fixed");
+            assertInstanceOf(GenericFixed.class, fixedObj);
+            final BigDecimal fromFixed = new 
Conversions.DecimalConversion().fromFixed((GenericFixed) fixedObj, 
fixedFieldSchema, fixedLogicalType);
+
+            assertEquals(expected, fromBytes);
+            assertEquals(expected, fromFixed);
+        }
+    }
+
     protected void assertMatch(final Record record, final GenericRecord 
avroRecord) {
         for (final String fieldName : record.getSchema().getFieldNames()) {
             Object avroValue = avroRecord.get(fieldName);
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals-bytes-fixed.avsc
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals-bytes-fixed.avsc
new file mode 100644
index 0000000000..8ce5b0b8f4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/decimals-bytes-fixed.avsc
@@ -0,0 +1,28 @@
+{
+  "namespace": "nifi",
+  "name": "decimal_types",
+  "type": "record",
+  "fields": [
+    {
+      "name": "dec_bytes",
+      "type": {
+        "type": "bytes",
+        "logicalType": "decimal",
+        "precision": 9,
+        "scale": 2
+      }
+    },
+    {
+      "name": "dec_fixed",
+      "type": {
+        "type": "fixed",
+        "name": "decfixed",
+        "size": 4,
+        "logicalType": "decimal",
+        "precision": 9,
+        "scale": 2
+      }
+    }
+  ]
+}
+
diff --git 
a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/fixed16.avsc
 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/fixed16.avsc
new file mode 100644
index 0000000000..396eee71d7
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/fixed16.avsc
@@ -0,0 +1,16 @@
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+      "name": "fixed",
+      "type": {
+        "type": "fixed",
+        "name": "fixed16",
+        "size": 16
+      }
+    }
+  ]
+}
+

Reply via email to