This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new fd6e023039 NIFI-8135 allow CHOICE data types in conversion of Records to Java Maps fd6e023039 is described below commit fd6e0230392704ee81e95b2ff2b19f4fd3dbd774 Author: Chris Sampson <chris.sampso...@gmail.com> AuthorDate: Sat Sep 16 18:34:21 2023 +0100 NIFI-8135 allow CHOICE data types in conversion of Records to Java Maps Signed-off-by: Matt Burgess <mattyb...@apache.org> --- .../serialization/record/util/DataTypeUtils.java | 80 +++++++++++----------- .../serialization/record/TestDataTypeUtils.java | 64 ++++++++++++++--- .../nifi-jolt-record-processors/pom.xml | 2 + .../jolt/record/TestJoltTransformRecord.java | 33 ++++++++- .../TestJoltTransformRecord/flattenSpec.json | 21 ++++++ .../TestJoltTransformRecord/flattenedOutput.json | 21 ++++++ .../resources/TestJoltTransformRecord/input.json | 24 ++++--- 7 files changed, 183 insertions(+), 62 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 7464b28ee9..1d249eebd8 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -806,6 +806,7 @@ public class DataTypeUtils { for (final Object key : original.keySet()) { if (!(key instanceof String)) { keysAreStrings = false; + break; } } @@ -854,79 +855,82 @@ public class DataTypeUtils { */ @SuppressWarnings({"unchecked", "rawtypes"}) public static Object convertRecordFieldtoObject(final Object value, final DataType dataType) { - if (value == null) { return null; } + DataType chosenDataType; + if (dataType instanceof ChoiceDataType) { + final DataType chosen = chooseDataType(value, (ChoiceDataType) dataType); + chosenDataType = chosen != null ? chosen : dataType; + } else { + chosenDataType = dataType; + } + if (value instanceof Record) { - Record record = (Record) value; - RecordSchema recordSchema = record.getSchema(); + final Record record = (Record) value; + final RecordSchema recordSchema = record.getSchema(); if (recordSchema == null) { throw new IllegalTypeConversionException("Cannot convert value of type Record to Map because Record does not have an associated Schema"); } - final Map<String, Object> recordMap = new LinkedHashMap<>(); - for (RecordField field : recordSchema.getFields()) { - final DataType fieldDataType = field.getDataType(); + final Map<String, Object> recordMap = new LinkedHashMap<>(record.getRawFieldNames().size(), 1); + for (final RecordField field : recordSchema.getFields()) { final String fieldName = field.getFieldName(); - Object fieldValue = record.getValue(fieldName); + final Object fieldValue = record.getValue(fieldName); + if (field.getDataType() instanceof ChoiceDataType) { + final DataType chosen = chooseDataType(fieldValue, (ChoiceDataType) field.getDataType()); + chosenDataType = chosen != null ? chosen : field.getDataType(); + } else { + chosenDataType = field.getDataType(); + } if (fieldValue == null) { recordMap.put(fieldName, null); - } else if (isScalarValue(fieldDataType, fieldValue)) { + } else if (isScalarValue(chosenDataType, fieldValue)) { recordMap.put(fieldName, fieldValue); - } else if (fieldDataType instanceof RecordDataType) { + } else if (chosenDataType instanceof RecordDataType) { Record nestedRecord = (Record) fieldValue; - recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, fieldDataType)); - } else if (fieldDataType instanceof MapDataType) { - recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType)fieldDataType).getValueType())); - - } else if (fieldDataType instanceof ArrayDataType) { - recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[])fieldValue, ((ArrayDataType) fieldDataType).getElementType())); + recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, chosenDataType)); + } else if (chosenDataType instanceof MapDataType) { + recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType) chosenDataType).getValueType())); + } else if (chosenDataType instanceof ArrayDataType) { + recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[]) fieldValue, ((ArrayDataType) chosenDataType).getElementType())); } else { - throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + fieldDataType.toString() + throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + chosenDataType + " to Map for field " + fieldName + " because the type is not supported"); } } return recordMap; } else if (value instanceof Map) { - return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType()); - } else if (dataType != null && isScalarValue(dataType, value)) { + return convertRecordMapToJavaMap((Map) value, ((MapDataType) chosenDataType).getValueType()); + } else if (chosenDataType != null && isScalarValue(chosenDataType, value)) { return value; - } else if (value instanceof Object[] && dataType instanceof ArrayDataType) { + } else if (value instanceof Object[] && chosenDataType instanceof ArrayDataType) { // This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object - return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType()); + return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) chosenDataType).getElementType()); } throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported"); } - - public static Map<String, Object> convertRecordMapToJavaMap(final Map<String, Object> map, DataType valueDataType) { - + public static Map<String, Object> convertRecordMapToJavaMap(final Map<String, Object> map, final DataType valueDataType) { if (map == null) { return null; } - Map<String, Object> resultMap = new LinkedHashMap<>(); - for (Map.Entry<String, Object> entry : map.entrySet()) { + final Map<String, Object> resultMap = new LinkedHashMap<>(); + for (final Map.Entry<String, Object> entry : map.entrySet()) { resultMap.put(entry.getKey(), convertRecordFieldtoObject(entry.getValue(), valueDataType)); } return resultMap; } - public static Object[] convertRecordArrayToJavaArray(final Object[] array, DataType elementDataType) { - - if (array == null || array.length == 0 || isScalarValue(elementDataType, array[0])) { + public static Object[] convertRecordArrayToJavaArray(final Object[] array, final DataType elementDataType) { + if (array == null || array.length == 0 || Arrays.stream(array).allMatch(o -> isScalarValue(elementDataType, o))) { return array; } else { - // Must be an array of complex types, build an array of converted values - Object[] resultArray = new Object[array.length]; - for (int i = 0; i < array.length; i++) { - resultArray[i] = convertRecordFieldtoObject(array[i], elementDataType); - } - return resultArray; + return Arrays.stream(array).map(o -> convertRecordFieldtoObject(o, elementDataType)).toArray(); } } @@ -1089,7 +1093,7 @@ public class DataTypeUtils { if(dataType.getEnums() != null && dataType.getEnums().contains(value)) { return value.toString(); } - throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType.toString() + " for field " + fieldName); + throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType + " for field " + fieldName); } public static java.sql.Date toDate(final Object value, final Supplier<DateFormat> format, final String fieldName) { @@ -1933,11 +1937,7 @@ public class DataTypeUtils { return true; } - if (!Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue())) { - return true; - } - - return false; + return !Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue()); } public static RecordField merge(final RecordField thisField, final RecordField otherField) { diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index 16a1a64128..7b11fd38bc 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -232,6 +232,30 @@ public class TestDataTypeUtils { } } + @Test + void testConvertRecordFieldToObjectWithNestedRecord() { + final Record record = DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", new Object[] {"some string", DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")}); + }}, ""); + + final Object obj = DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getDataType()); + assertTrue(obj instanceof Map); + final Map<String, Object> map = (Map<String, Object>) obj; + assertEquals("John", map.get("firstName")); + assertEquals(30, map.get("age")); + + assertTrue(map.get("addresses") instanceof Object[]); + final Object[] objArray = (Object[]) map.get("addresses"); + assertEquals(2, objArray.length); + assertEquals("some string", objArray[0]); + + assertTrue(objArray[1] instanceof Map); + final Map<String, Object> addressMap = (Map<String, Object>) objArray[1]; + assertEquals("123 Fake Street", addressMap.get("address_1")); + } + @Test @SuppressWarnings("unchecked") public void testConvertRecordFieldToObject() { @@ -243,12 +267,18 @@ public class TestDataTypeUtils { fields.add(new RecordField("noDefault", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType()))); fields.add(new RecordField("intField", RecordFieldType.INT.getDataType())); fields.add(new RecordField("intArray", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); + fields.add(new RecordField("objArray", RecordFieldType.ARRAY.getArrayDataType( + RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType()) + ))); + fields.add(new RecordField("choiceArray", RecordFieldType.ARRAY.getArrayDataType( + RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())) + ))); // Map of Records with Arrays - List<RecordField> nestedRecordFields = new ArrayList<>(); + final List<RecordField> nestedRecordFields = new ArrayList<>(); nestedRecordFields.add(new RecordField("a", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); nestedRecordFields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); - RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields); + final RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields); fields.add(new RecordField("complex", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(nestedRecordSchema)))); @@ -257,6 +287,9 @@ public class TestDataTypeUtils { values.put("noDefault", "world"); values.put("intField", 5); values.put("intArray", new Integer[] {3,2,1}); + values.put("objArray", new Object[] {3,"2","abc",1}); + values.put("noChoiceArray", new Object[] {"foo","BAR"}); + values.put("choiceArray", new Object[] {"foo",new Object[]{"bar","baz"}}); final Map<String, Object> complexValues = new HashMap<>(); final Map<String, Object> complexValueRecord1 = new HashMap<>(); @@ -275,22 +308,38 @@ public class TestDataTypeUtils { Object o = DataTypeUtils.convertRecordFieldtoObject(inputRecord, RecordFieldType.RECORD.getRecordDataType(schema)); assertTrue(o instanceof Map); - Map<String,Object> outputMap = (Map<String,Object>) o; + final Map<String,Object> outputMap = (Map<String,Object>) o; assertEquals("hello", outputMap.get("defaultOfHello")); assertEquals("world", outputMap.get("noDefault")); o = outputMap.get("intField"); assertEquals(5,o); o = outputMap.get("intArray"); assertTrue(o instanceof Integer[]); - Integer[] intArray = (Integer[])o; + final Integer[] intArray = (Integer[])o; assertEquals(3, intArray.length); assertEquals((Integer)3, intArray[0]); + o = outputMap.get("objArray"); + assertTrue(o instanceof Object[]); + final Object[] objArray = (Object[])o; + assertEquals(4, objArray.length); + assertEquals(3, objArray[0]); + assertEquals("2", objArray[1]); + o = outputMap.get("choiceArray"); + assertTrue(o instanceof Object[]); + final Object[] choiceArray = (Object[])o; + assertEquals(2, choiceArray.length); + assertEquals("foo", choiceArray[0]); + assertTrue(choiceArray[1] instanceof Object[]); + final Object[] strArray = (Object[]) choiceArray[1]; + assertEquals(2, strArray.length); + assertEquals("bar", strArray[0]); + assertEquals("baz", strArray[1]); o = outputMap.get("complex"); assertTrue(o instanceof Map); - Map<String,Object> nestedOutputMap = (Map<String,Object>)o; + final Map<String,Object> nestedOutputMap = (Map<String,Object>)o; o = nestedOutputMap.get("complex1"); assertTrue(o instanceof Map); - Map<String,Object> complex1 = (Map<String,Object>)o; + final Map<String,Object> complex1 = (Map<String,Object>)o; o = complex1.get("a"); assertTrue(o instanceof Integer[]); assertEquals((Integer)2, ((Integer[])o)[1]); @@ -299,14 +348,13 @@ public class TestDataTypeUtils { assertEquals((Integer)3, ((Integer[])o)[2]); o = nestedOutputMap.get("complex2"); assertTrue(o instanceof Map); - Map<String,Object> complex2 = (Map<String,Object>)o; + final Map<String,Object> complex2 = (Map<String,Object>)o; o = complex2.get("a"); assertTrue(o instanceof String[]); assertEquals("hello", ((String[])o)[0]); o = complex2.get("b"); assertTrue(o instanceof String[]); assertEquals("4", ((String[])o)[1]); - } @Test diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml index b869e2414f..5c98a168d6 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml @@ -111,6 +111,8 @@ <exclude>src/test/resources/TestJoltTransformRecord/cardrOutput.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/defaultrSpec.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/defaultrOutput.json</exclude> + <exclude>src/test/resources/TestJoltTransformRecord/flattenSpec.json</exclude> + <exclude>src/test/resources/TestJoltTransformRecord/flattenedOutput.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/shiftrSpec.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json</exclude> <exclude>src/test/resources/TestJoltTransformRecord/sortrOutput.json</exclude> diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java index ef784afbc5..4dbbb4c685 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java @@ -18,9 +18,11 @@ package org.apache.nifi.processors.jolt.record; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.Relationship; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -689,6 +691,34 @@ public class TestJoltTransformRecord { runner.assertNotValid(); } + @Test + public void testJoltComplexChoiceField() throws Exception { + final JsonTreeReader reader = new JsonTreeReader(); + runner.addControllerService("reader", reader); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + runner.enableControllerService(reader); + runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); + + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + + final String flattenSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CHAINR); + + final String inputJson = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/input.json"))); + runner.enqueue(inputJson); + + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json"))), + new String(transformed.toByteArray())); + } + private static Stream<Arguments> getChainrArguments() { return Stream.of( Arguments.of(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"), "has no single line comments"), @@ -696,7 +726,6 @@ public class TestJoltTransformRecord { } private void generateTestData(int numRecords, final BiFunction<Integer, MockRecordParser, Void> recordGenerator) { - if (recordGenerator == null) { final RecordSchema primarySchema = new SimpleRecordSchema(Arrays.asList( new RecordField("value", RecordFieldType.INT.getDataType()))); @@ -733,8 +762,6 @@ public class TestJoltTransformRecord { parser.addRecord(ratingRecord); } - - } else { recordGenerator.apply(numRecords, parser); } diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json new file mode 100644 index 0000000000..64076a65ca --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json @@ -0,0 +1,21 @@ +[ + { + "operation": "shift", + "spec": { "*": "record.&" } + }, + { + "operation": "shift", + "spec": { + "record": { + "*": { + "$": "TValue[#2].name", + "@": "TValue[#2].value" + } + } + } + }, + { + "operation": "default", + "spec": { "TValue[]": { "*": { "class": "unclass" } } } + } +] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json new file mode 100644 index 0000000000..eec23144df --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json @@ -0,0 +1,21 @@ +[ { + "TValue" : [ { + "name" : "datetime", + "value" : "2023-10-06 20:36:09.937019+00:00", + "class" : "unclass" + }, { + "name" : "Eta", + "value" : "", + "class" : "unclass" + } ] +}, { + "TValue" : [ { + "name" : "datetime", + "value" : "2023-08-24 17:07:03.334170+00:00", + "class" : "unclass" + }, { + "name" : "Eta", + "value" : "{Day=15, Hour=6, Minute=0, Month=8}", + "class" : "unclass" + } ] +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json index 12d85dbe98..6c64a8539b 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json @@ -1,13 +1,15 @@ -{ - "rating": { - "primary": { - "value": 3 - }, - "series": { - "value": [5,4] - }, - "quality": { - "value": 3 +[ + { + "datetime": "2023-10-06 20:36:09.937019+00:00", + "Eta": "" + }, + { + "datetime": "2023-08-24 17:07:03.334170+00:00", + "Eta": { + "Day": 15, + "Hour": 6, + "Minute": 0, + "Month": 8 } } -} \ No newline at end of file +] \ No newline at end of file