This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.11.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 7839493dfaa3e97e75c376c113dd1176519308b2 Author: Mark Payne <[email protected]> AuthorDate: Fri Mar 6 11:45:59 2020 -0500 Fixed bug in JsonRowRecordReader when reading a 'raw' record with a schema that indicates that a field should be a Map. Also updated unit test to explicitly define schema, since schema inference will never return a Map but rather a Record --- .../nifi/processors/standard/TestValidateRecord.java | 13 +++++++------ .../nifi/json/AbstractJsonRowRecordReader.java | 20 +++++++++++++++++++- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java index 1b4b7e4..74c3a17 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java @@ -33,7 +33,6 @@ import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.DateTimeUtils; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -53,7 +52,7 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.HashMap; +import java.util.Map; import java.util.Optional; import static org.junit.Assert.assertEquals; @@ -560,6 +559,7 @@ public class TestValidateRecord { final JsonTreeReader jsonReader = new JsonTreeReader(); runner.addControllerService("reader", jsonReader); runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property"); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema); runner.enableControllerService(jsonReader); @@ -585,11 +585,12 @@ public class TestValidateRecord { try (final InputStream in = new ByteArrayInputStream(source); final AvroRecordReader reader = new AvroReaderWithEmbeddedSchema(in)) { final Object[] values = reader.nextRecord().getValues(); assertEquals("uuid", values[0]); - assertEquals(2, ((HashMap<?,?>) values[1]).size()); + assertEquals(2, ((Map<?,?>) values[1]).size()); final Object[] data = (Object[]) values[2]; - assertEquals(2, ( (HashMap<?,?>) ((MapRecord) data[0]).getValue("points")).size()); - assertEquals(2, ( (HashMap<?,?>) ((MapRecord) data[1]).getValue("points")).size()); - assertEquals(2, ( (HashMap<?,?>) ((MapRecord) data[2]).getValue("points")).size()); + assertEquals(3, data.length); + assertEquals(2, ( (Map<?,?>) ((Record) data[0]).getValue("points")).size()); + assertEquals(2, ( (Map<?,?>) ((Record) data[1]).getValue("points")).size()); + assertEquals(2, ( (Map<?,?>) ((Record) data[2]).getValue("points")).size()); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index f163707..6acb0d7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -28,6 +28,7 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.MapDataType; import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.codehaus.jackson.JsonFactory; @@ -189,7 +190,24 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { if (fieldNode.isObject()) { RecordSchema childSchema = null; - if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) { + if (dataType != null && RecordFieldType.MAP == dataType.getFieldType()) { + final MapDataType mapDataType = (MapDataType) dataType; + final DataType valueType = mapDataType.getValueType(); + + final Map<String, Object> mapValue = new HashMap<>(); + + final Iterator<Map.Entry<String, JsonNode>> fieldItr = fieldNode.getFields(); + while (fieldItr.hasNext()) { + final Map.Entry<String, JsonNode> entry = fieldItr.next(); + final String elementName = entry.getKey(); + final JsonNode elementNode = entry.getValue(); + + final Object nodeValue = getRawNodeValue(elementNode, valueType, fieldName + "['" + elementName + "']"); + mapValue.put(elementName, nodeValue); + } + + return mapValue; + } else if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) { final RecordDataType recordDataType = (RecordDataType) dataType; childSchema = recordDataType.getChildSchema(); } else if (dataType != null && RecordFieldType.CHOICE == dataType.getFieldType()) {
