This is an automated email from the ASF dual-hosted git repository. tpalfy pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new ba2e24b68f NIFI-12745: Fix AvroReader silently dropping malformed records ba2e24b68f is described below commit ba2e24b68f036363f333e216ee968d999a59e268 Author: Rajmund Takacs <tak...@gmail.com> AuthorDate: Tue Feb 6 16:03:56 2024 +0100 NIFI-12745: Fix AvroReader silently dropping malformed records This closes #8361. Signed-off-by: Tamas Palfy <tpa...@apache.org> --- .../nifi-record-serialization-services/pom.xml | 2 + .../nifi/avro/AvroReaderWithExplicitSchema.java | 2 +- .../avro/TestAvroReaderWithExplicitSchema.java | 54 ++++++++++++++++----- .../resources/avro/schemaless_simple_record.avro | Bin 0 -> 9 bytes .../resources/avro/schemaless_simple_record.avsc | 28 +++++++++++ .../avro/schemaless_simple_record_extra_field.avsc | 24 +++++++++ 6 files changed, 96 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index ebc8fd2f00..5e84a59faf 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -169,6 +169,8 @@ <exclude>src/test/resources/avro/multiple-types.avsc</exclude> <exclude>src/test/resources/avro/simple.avsc</exclude> <exclude>src/test/resources/avro/recursive.avsc</exclude> + <exclude>src/test/resources/avro/schemaless_simple_record.avsc</exclude> + <exclude>src/test/resources/avro/schemaless_simple_record_extra_field.avsc</exclude> <exclude>src/test/resources/cef/empty-row.txt</exclude> <exclude>src/test/resources/cef/misformatted-row.txt</exclude> diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java index ab20aad811..6f22123170 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java @@ -91,7 +91,7 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader { try { genericRecord = datumReader.read(null, decoder); } catch (final EOFException eof) { - return null; + throw new IOException("Was expecting more data, but reached EOF.", eof); } return genericRecord; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java index 6a3a639161..9bbaa55bb7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java @@ -16,14 +16,10 @@ */ package org.apache.nifi.avro; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSchema; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.File; import java.io.FileInputStream; @@ -32,11 +28,15 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.Test; public class TestAvroReaderWithExplicitSchema { @@ -105,6 +105,34 @@ public class TestAvroReaderWithExplicitSchema { assertThrows(IOException.class, () -> new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema)); } + @Test + public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema() throws Exception { + AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema( + "src/test/resources/avro/schemaless_simple_record.avro", + "src/test/resources/avro/schemaless_simple_record.avsc" + ); + + GenericData.Record expected = new GenericData.Record(new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record.avsc"))); + expected.put("field_1", 123); + expected.put("field_2", "44"); + expected.put("field_3", 5); + + GenericRecord actual1 = avroReader.nextAvroRecord(); + assertEquals(expected, actual1); + + GenericRecord actual2 = avroReader.nextAvroRecord(); + assertNull(actual2); + } + + @Test + public void testAvroExplicitReaderWithSchemalessFileAndWrongExplicitSchema() throws Exception { + AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema( + "src/test/resources/avro/schemaless_simple_record.avro", + "src/test/resources/avro/schemaless_simple_record_extra_field.avsc" + ); + assertThrows(IOException.class, avroReader::nextAvroRecord); + } + @Test public void testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() throws Exception { // GIVEN diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro new file mode 100644 index 0000000000..858062de33 Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro differ diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc new file mode 100644 index 0000000000..788b7b73c7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc @@ -0,0 +1,28 @@ +{ + "type": "record", + "name": "nifiRecord", + "namespace": "org.apache.nifi", + "fields": [ + { + "name": "field_1", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_2", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_3", + "type": [ + "int", + "null" + ] + } + ] +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc new file mode 100644 index 0000000000..276107a0e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc @@ -0,0 +1,24 @@ +{ + "type":"record", + "name":"message_name", + "namespace":"message_namespace", + "fields":[ + { + "name":"field_1", + "type":["long"] + }, + { + "name":"field_2", + "type":["string"] + }, + { + "name":"field_3", + "type":["int"] + }, + { + "name":"extra", + "type":["long"] + } + ] +} +