This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new ba2e24b68f NIFI-12745: Fix AvroReader silently dropping malformed 
records
ba2e24b68f is described below

commit ba2e24b68f036363f333e216ee968d999a59e268
Author: Rajmund Takacs <tak...@gmail.com>
AuthorDate: Tue Feb 6 16:03:56 2024 +0100

    NIFI-12745: Fix AvroReader silently dropping malformed records
    
    This closes #8361.
    
    Signed-off-by: Tamas Palfy <tpa...@apache.org>
---
 .../nifi-record-serialization-services/pom.xml     |   2 +
 .../nifi/avro/AvroReaderWithExplicitSchema.java    |   2 +-
 .../avro/TestAvroReaderWithExplicitSchema.java     |  54 ++++++++++++++++-----
 .../resources/avro/schemaless_simple_record.avro   | Bin 0 -> 9 bytes
 .../resources/avro/schemaless_simple_record.avsc   |  28 +++++++++++
 .../avro/schemaless_simple_record_extra_field.avsc |  24 +++++++++
 6 files changed, 96 insertions(+), 14 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index ebc8fd2f00..5e84a59faf 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -169,6 +169,8 @@
                         
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
                         <exclude>src/test/resources/avro/simple.avsc</exclude>
                         
<exclude>src/test/resources/avro/recursive.avsc</exclude>
+                        
<exclude>src/test/resources/avro/schemaless_simple_record.avsc</exclude>
+                        
<exclude>src/test/resources/avro/schemaless_simple_record_extra_field.avsc</exclude>
 
                         <exclude>src/test/resources/cef/empty-row.txt</exclude>
                         
<exclude>src/test/resources/cef/misformatted-row.txt</exclude>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
index ab20aad811..6f22123170 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
@@ -91,7 +91,7 @@ public class AvroReaderWithExplicitSchema extends 
AvroRecordReader {
         try {
             genericRecord = datumReader.read(null, decoder);
         } catch (final EOFException eof) {
-            return null;
+            throw new IOException("Was expecting more data, but reached EOF.", 
eof);
         }
 
         return genericRecord;
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
index 6a3a639161..9bbaa55bb7 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java
@@ -16,14 +16,10 @@
  */
 package org.apache.nifi.avro;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -32,11 +28,15 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
 
 public class TestAvroReaderWithExplicitSchema {
 
@@ -105,6 +105,34 @@ public class TestAvroReaderWithExplicitSchema {
         assertThrows(IOException.class, () -> new 
AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema));
     }
 
+    @Test
+    public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema() 
throws Exception {
+        AvroReaderWithExplicitSchema avroReader = 
createAvroReaderWithExplicitSchema(
+                "src/test/resources/avro/schemaless_simple_record.avro",
+                "src/test/resources/avro/schemaless_simple_record.avsc"
+        );
+
+        GenericData.Record expected = new GenericData.Record(new 
Schema.Parser().parse(new 
File("src/test/resources/avro/schemaless_simple_record.avsc")));
+        expected.put("field_1", 123);
+        expected.put("field_2", "44");
+        expected.put("field_3", 5);
+
+        GenericRecord actual1 = avroReader.nextAvroRecord();
+        assertEquals(expected, actual1);
+
+        GenericRecord actual2 = avroReader.nextAvroRecord();
+        assertNull(actual2);
+    }
+
+    @Test
+    public void 
testAvroExplicitReaderWithSchemalessFileAndWrongExplicitSchema() throws 
Exception {
+        AvroReaderWithExplicitSchema avroReader = 
createAvroReaderWithExplicitSchema(
+                "src/test/resources/avro/schemaless_simple_record.avro",
+                
"src/test/resources/avro/schemaless_simple_record_extra_field.avsc"
+        );
+        assertThrows(IOException.class, avroReader::nextAvroRecord);
+    }
+
     @Test
     public void 
testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() 
throws Exception {
         // GIVEN
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro
new file mode 100644
index 0000000000..858062de33
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro
 differ
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc
new file mode 100644
index 0000000000..788b7b73c7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc
@@ -0,0 +1,28 @@
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [
+    {
+      "name": "field_1",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "field_2",
+      "type": [
+        "string",
+        "null"
+      ]
+    },
+    {
+      "name": "field_3",
+      "type": [
+        "int",
+        "null"
+      ]
+    }
+  ]
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc
new file mode 100644
index 0000000000..276107a0e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc
@@ -0,0 +1,24 @@
+{
+  "type":"record",
+  "name":"message_name",
+  "namespace":"message_namespace",
+  "fields":[
+    {
+      "name":"field_1",
+      "type":["long"]
+    },
+    {
+      "name":"field_2",
+      "type":["string"]
+    },
+    {
+      "name":"field_3",
+      "type":["int"]
+    },
+    {
+      "name":"extra",
+      "type":["long"]
+    }
+  ]
+}
+

Reply via email to