This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 7839493dfaa3e97e75c376c113dd1176519308b2
Author: Mark Payne <[email protected]>
AuthorDate: Fri Mar 6 11:45:59 2020 -0500

    Fixed bug in JsonRowRecordReader when reading a 'raw' record with a schema 
that indicates that a field should be a Map. Also updated unit test to 
explicitly define schema, since schema inference will never return a Map but 
rather a Record
---
 .../nifi/processors/standard/TestValidateRecord.java | 13 +++++++------
 .../nifi/json/AbstractJsonRowRecordReader.java       | 20 +++++++++++++++++++-
 2 files changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 1b4b7e4..74c3a17 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -33,7 +33,6 @@ import org.apache.nifi.schema.inference.SchemaInferenceUtil;
 import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -53,7 +52,7 @@ import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
@@ -560,6 +559,7 @@ public class TestValidateRecord {
         final JsonTreeReader jsonReader = new JsonTreeReader();
         runner.addControllerService("reader", jsonReader);
         runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
+        runner.setProperty(jsonReader, 
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
         runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, 
validateSchema);
         runner.enableControllerService(jsonReader);
 
@@ -585,11 +585,12 @@ public class TestValidateRecord {
         try (final InputStream in = new ByteArrayInputStream(source); final 
AvroRecordReader reader = new AvroReaderWithEmbeddedSchema(in)) {
             final Object[] values = reader.nextRecord().getValues();
             assertEquals("uuid", values[0]);
-            assertEquals(2, ((HashMap<?,?>) values[1]).size());
+            assertEquals(2, ((Map<?,?>) values[1]).size());
             final Object[] data = (Object[]) values[2];
-            assertEquals(2, ( (HashMap<?,?>) ((MapRecord) 
data[0]).getValue("points")).size());
-            assertEquals(2, ( (HashMap<?,?>) ((MapRecord) 
data[1]).getValue("points")).size());
-            assertEquals(2, ( (HashMap<?,?>) ((MapRecord) 
data[2]).getValue("points")).size());
+            assertEquals(3, data.length);
+            assertEquals(2, ( (Map<?,?>) ((Record) 
data[0]).getValue("points")).size());
+            assertEquals(2, ( (Map<?,?>) ((Record) 
data[1]).getValue("points")).size());
+            assertEquals(2, ( (Map<?,?>) ((Record) 
data[2]).getValue("points")).size());
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index f163707..6acb0d7 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.codehaus.jackson.JsonFactory;
@@ -189,7 +190,24 @@ public abstract class AbstractJsonRowRecordReader 
implements RecordReader {
 
         if (fieldNode.isObject()) {
             RecordSchema childSchema = null;
-            if (dataType != null && RecordFieldType.RECORD == 
dataType.getFieldType()) {
+            if (dataType != null && RecordFieldType.MAP == 
dataType.getFieldType()) {
+                final MapDataType mapDataType = (MapDataType) dataType;
+                final DataType valueType = mapDataType.getValueType();
+
+                final Map<String, Object> mapValue = new HashMap<>();
+
+                final Iterator<Map.Entry<String, JsonNode>> fieldItr = 
fieldNode.getFields();
+                while (fieldItr.hasNext()) {
+                    final Map.Entry<String, JsonNode> entry = fieldItr.next();
+                    final String elementName = entry.getKey();
+                    final JsonNode elementNode = entry.getValue();
+
+                    final Object nodeValue = getRawNodeValue(elementNode, 
valueType, fieldName + "['" + elementName + "']");
+                    mapValue.put(elementName, nodeValue);
+                }
+
+                return mapValue;
+            } else if (dataType != null && RecordFieldType.RECORD == 
dataType.getFieldType()) {
                 final RecordDataType recordDataType = (RecordDataType) 
dataType;
                 childSchema = recordDataType.getChildSchema();
             } else if (dataType != null && RecordFieldType.CHOICE == 
dataType.getFieldType()) {

Reply via email to