This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 2d112871db NIFI-8134 allow unescapeJson Record Path function to recursively convert Maps to Records (#7745) 2d112871db is described below commit 2d112871db40c8f599d31974d14dd4a2227bf7da Author: Chris Sampson <12159006+chrissamo...@users.noreply.github.com> AuthorDate: Tue May 14 22:11:28 2024 +0100 NIFI-8134 allow unescapeJson Record Path function to recursively convert Maps to Records (#7745) * NIFI-8134 recursively convert Java Objects to NiFi Records --- .../nifi/record/path/functions/UnescapeJson.java | 17 +- .../nifi/record/path/paths/RecordPathCompiler.java | 12 +- .../apache/nifi/record/path/TestRecordPath.java | 183 ++++++++++++++------- .../serialization/record/util/DataTypeUtils.java | 73 ++++++-- nifi-docs/src/main/asciidoc/record-path-guide.adoc | 30 +++- .../nifi-standard-processors/pom.xml | 4 + .../nifi/processors/standard/TestUpdateRecord.java | 64 +++++++ .../TestUpdateRecord/input/organisation.json | 3 + .../TestUpdateRecord/output/organisation.json | 15 ++ .../organisation-with-departments-string.avsc | 8 + .../schema/organisation-with-departments.avsc | 78 +++++++++ 11 files changed, 400 insertions(+), 87 deletions(-) diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java index 35f7d93d3f..d5e821a44b 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java @@ -44,18 +44,23 @@ public class UnescapeJson extends RecordPathSegment { private final RecordPathSegment convertToRecordRecordPath; + private final RecordPathSegment recursiveRecordConversion; + private final ObjectMapper objectMapper = new ObjectMapper(); - public UnescapeJson(final RecordPathSegment recordPath, final RecordPathSegment convertToRecordRecordPath, final boolean absolute) { + public UnescapeJson(final RecordPathSegment recordPath, final RecordPathSegment convertToRecordRecordPath, final RecordPathSegment recursiveRecordConversion, final boolean absolute) { super("unescapeJson", null, absolute); this.recordPath = recordPath; this.convertToRecordRecordPath = convertToRecordRecordPath; + this.recursiveRecordConversion = recursiveRecordConversion; } @Override public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { final boolean convertMapToRecord = convertToRecordRecordPath != null && Boolean.parseBoolean(RecordPathUtils.getFirstStringValue(convertToRecordRecordPath, context)); + final boolean recursiveMapToRecord = recursiveRecordConversion != null + && Boolean.parseBoolean(RecordPathUtils.getFirstStringValue(recursiveRecordConversion, context)); final Stream<FieldValue> fieldValues = recordPath.evaluate(context); return fieldValues.filter(fv -> fv.getValue() != null) @@ -70,7 +75,7 @@ public class UnescapeJson extends RecordPathSegment { } return new StandardFieldValue( - convertFieldValue(value, fv.getField().getFieldName(), dataType, convertMapToRecord), + convertFieldValue(value, fv.getField().getFieldName(), dataType, convertMapToRecord, recursiveMapToRecord), fv.getField(), fv.getParent().orElse(null) ); } catch (IOException e) { @@ -83,7 +88,7 @@ public class UnescapeJson extends RecordPathSegment { } @SuppressWarnings("unchecked") - private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType, final boolean convertMapToRecord) throws IOException { + private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType, final boolean convertMapToRecord, final boolean recursiveMapToRecord) throws IOException { if (dataType instanceof RecordDataType) { // convert Maps to Records final Map<String, Object> map = objectMapper.readValue(value.toString(), Map.class); @@ -102,13 +107,13 @@ public class UnescapeJson extends RecordPathSegment { final Object parsed = objectMapper.readValue(value.toString(), Object.class); if (convertMapToRecord) { if (DataTypeUtils.isCompatibleDataType(parsed, RecordFieldType.RECORD.getDataType())) { - return DataTypeUtils.toRecord(parsed, fieldName); + return DataTypeUtils.toRecord(parsed, fieldName, recursiveMapToRecord); } else if (DataTypeUtils.isArrayTypeCompatible(parsed, RecordFieldType.RECORD.getDataType())) { - return Arrays.stream((Object[]) parsed).map(m -> DataTypeUtils.toRecord(m, fieldName)).toArray(Record[]::new); + return Arrays.stream((Object[]) parsed).map(m -> DataTypeUtils.toRecord(m, fieldName, recursiveMapToRecord)).toArray(Record[]::new); } else if (parsed instanceof Collection && !((Collection<Object>) parsed).isEmpty() && DataTypeUtils.isCompatibleDataType(((Collection<Object>) parsed).stream().findFirst().get(), RecordFieldType.RECORD.getDataType())) { - return ((Collection<Object>) parsed).stream().map(m -> DataTypeUtils.toRecord(m, fieldName)).collect(Collectors.toList()); + return ((Collection<Object>) parsed).stream().map(m -> DataTypeUtils.toRecord(m, fieldName, recursiveMapToRecord)).collect(Collectors.toList()); } } diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index 9061014295..a75b3376c0 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -352,13 +352,11 @@ public class RecordPathCompiler { case "unescapeJson": { final int numArgs = argumentListTree.getChildCount(); - if (numArgs == 1) { - final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); - return new UnescapeJson(args[0], null, absolute); - } else { - final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); - return new UnescapeJson(args[0], args[1], absolute); - } + final RecordPathSegment[] args = getArgPaths(argumentListTree, numArgs, functionName, absolute); + final RecordPathSegment convertToRecord = numArgs > 1 ? args[1] : null; + final RecordPathSegment recursiveConversion = numArgs > 2 ? args[2] : null; + + return new UnescapeJson(args[0], convertToRecord, recursiveConversion, absolute); } case "hash":{ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index f374325528..a548ccd812 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -17,6 +17,21 @@ package org.apache.nifi.record.path; +import org.apache.nifi.record.path.exception.RecordPathException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.uuid5.Uuid5Util; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.StandardCharsets; import java.sql.Date; @@ -27,28 +42,16 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.nifi.record.path.exception.RecordPathException; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.type.ArrayDataType; -import org.apache.nifi.serialization.record.util.DataTypeUtils; -import org.apache.nifi.uuid5.Uuid5Util; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -1852,25 +1855,26 @@ public class TestRecordPath { new RecordField("person", RecordFieldType.RECORD.getRecordDataType(person)) )); - final Map<String, Object> values = new HashMap<String, Object>(){{ - put("person", new MapRecord(person, new HashMap<String, Object>(){{ - put("firstName", "John"); - put("age", 30); - put("nicknames", new String[] {"J", "Johnny"}); - put("addresses", new MapRecord[]{ - new MapRecord(address, Collections.singletonMap("address_1", "123 Somewhere Street")), - new MapRecord(address, Collections.singletonMap("address_1", "456 Anywhere Road")) - }); - }})); - }}; + final Map<String, Object> values = Map.of( + "person", new MapRecord(person, Map.of( + "firstName", "John", + "age", 30, + "nicknames", new String[] {"J", "Johnny"}, + "addresses", new MapRecord[]{ + new MapRecord(address, Collections.singletonMap("address_1", "123 Somewhere Street")), + new MapRecord(address, Collections.singletonMap("address_1", "456 Anywhere Road")) + } + ) ) + ); final Record record = new MapRecord(schema, values); - assertEquals("\"John\"", RecordPath.compile("escapeJson(/person/firstName)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()); - assertEquals("30", RecordPath.compile("escapeJson(/person/age)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()); + assertEquals("\"John\"", RecordPath.compile("escapeJson(/person/firstName)").evaluate(record).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()); + assertEquals("30", RecordPath.compile("escapeJson(/person/age)").evaluate(record).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()); assertEquals( - "{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}", - RecordPath.compile("escapeJson(/person)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + """ + {"firstName":"John","age":30,"nicknames":["J","Johnny"],"addresses":[{"address_1":"123 Somewhere Street"},{"address_1":"456 Anywhere Road"}]}""", + RecordPath.compile("escapeJson(/person)").evaluate(record).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() ); } @@ -1899,79 +1903,111 @@ public class TestRecordPath { final Record mapAddressesArray = new MapRecord(schema, Collections.singletonMap( "json_str", - "{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}") + """ + {"firstName":"John","age":30,"nicknames":["J","Johnny"],"addresses":[{"address_1":"123 Somewhere Street"},{"address_1":"456 Anywhere Road"}]}""") ); assertEquals( - new HashMap<String, Object>(){{ - put("firstName", "John"); - put("age", 30); - put("nicknames", Arrays.asList("J", "Johnny")); - put("addresses", Arrays.asList( + Map.of( + "firstName", "John", + "age", 30, + "nicknames", Arrays.asList("J", "Johnny"), + "addresses", Arrays.asList( Collections.singletonMap("address_1", "123 Somewhere Street"), Collections.singletonMap("address_1", "456 Anywhere Road") - )); - }}, - RecordPath.compile("unescapeJson(/json_str)").evaluate(mapAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + ) + ), + RecordPath.compile("unescapeJson(/json_str)").evaluate(mapAddressesArray).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() ); // test CHOICE resulting in nested single RECORD final Record mapAddressesSingle = new MapRecord(schema, Collections.singletonMap( "json_str", - "{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":{\"address_1\":\"123 Somewhere Street\"}}") + """ + {"firstName":"John","age":30,"nicknames":["J","Johnny"],"addresses":{"address_1":"123 Somewhere Street"}}""") ); assertEquals( - new HashMap<String, Object>(){{ - put("firstName", "John"); - put("age", 30); - put("nicknames", Arrays.asList("J", "Johnny")); - put("addresses", Collections.singletonMap("address_1", "123 Somewhere Street")); - }}, - RecordPath.compile("unescapeJson(/json_str, 'false')").evaluate(mapAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + Map.of( + "firstName", "John", + "age", 30, + "nicknames", Arrays.asList("J", "Johnny"), + "addresses", Collections.singletonMap("address_1", "123 Somewhere Street") + ), + RecordPath.compile("unescapeJson(/json_str, 'false')").evaluate(mapAddressesSingle).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() ); // test single Record converted from Map Object final Record recordFromMap = new MapRecord(schema, Collections.singletonMap( "json_str", - "{\"firstName\":\"John\",\"age\":30}") + """ + {"firstName":"John","age":30}""") ); + Map<String, Object> expectedMap = new LinkedHashMap<>(); + expectedMap.put("firstName", "John"); + expectedMap.put("age", 30); assertEquals( - DataTypeUtils.toRecord(new HashMap<String, Object>(){{ - put("firstName", "John"); - put("age", 30); - }}, "json_str"), - RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + DataTypeUtils.toRecord(expectedMap, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + + // test nested Record converted from Map Object + final Record nestedRecordFromMap = new MapRecord(schema, + Collections.singletonMap( + "json_str", + """ + {"firstName":"John","age":30,"addresses":[{"address_1":"123 Fake Street"}]}""") + ); + // recursively convert Maps to Records (addresses becomes and ARRAY or RECORDs) + expectedMap.put("addresses", new Object[] {DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")}); + assertRecordsMatch( + DataTypeUtils.toRecord(expectedMap, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true', 'true')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + // convert Map to Record, without recursion (addresses becomes an ARRAY, but contents are still Maps) + expectedMap.put("addresses", new Object[] {Collections.singletonMap("address_1", "123 Fake Street")}); + assertRecordsMatch( + DataTypeUtils.toRecord(expectedMap, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true', 'false')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() + ); + // without Map conversion to Record (addresses remains a Collection, Maps are unchanged) + assertMapsMatch( + expectedMap, + RecordPath.compile("unescapeJson(/json_str, 'false')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue(), + false ); // test collection of Record converted from Map collection final Record recordCollectionFromMaps = new MapRecord(schema, Collections.singletonMap( "json_str", - "[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]") + """ + [{"address_1":"123 Somewhere Street"},{"address_1":"456 Anywhere Road"}]""") ); assertEquals( Arrays.asList( DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Somewhere Street"), "json_str"), DataTypeUtils.toRecord(Collections.singletonMap("address_1", "456 Anywhere Road"), "json_str") ), - RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordCollectionFromMaps).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordCollectionFromMaps).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() ); // test simple String field - final Record recordJustName = new MapRecord(schema, Collections.singletonMap("json_str", "{\"firstName\":\"John\"}")); + final Record recordJustName = new MapRecord(schema, Collections.singletonMap("json_str", + """ + {"firstName":"John"}""")); assertEquals( - new HashMap<String, Object>(){{put("firstName", "John");}}, - RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustName).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + Map.of("firstName", "John"), + RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustName).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue() ); // test simple String final Record recordJustString = new MapRecord(schema, Collections.singletonMap("json_str", "\"John\"")); - assertEquals("John", RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustString).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()); + assertEquals("John", RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustString).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()); // test simple Int final Record recordJustInt = new MapRecord(schema, Collections.singletonMap("json_str", "30")); - assertEquals(30, RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustInt).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()); + assertEquals(30, RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustInt).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()); // test invalid JSON final Record recordInvalidJson = new MapRecord(schema, Collections.singletonMap("json_str", "{\"invalid\": \"json")); @@ -1979,7 +2015,7 @@ public class TestRecordPath { RecordPathException rpe = assertThrows(RecordPathException.class, () -> RecordPath.compile("unescapeJson(/json_str)") .evaluate(recordInvalidJson).getSelectedFields() - .findFirst().orElseThrow(IllegalStateException::new).getValue()); + .findFirst().orElseThrow(AssertionError::new).getValue()); assertEquals("Unable to deserialise JSON String into Record Path value", rpe.getMessage()); // test not String @@ -1987,10 +2023,35 @@ public class TestRecordPath { IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> RecordPath.compile("unescapeJson(/person/age)") .evaluate(recordNotString).getSelectedFields() - .findFirst().orElseThrow(IllegalStateException::new).getValue()); + .findFirst().orElseThrow(AssertionError::new).getValue()); assertEquals("Argument supplied to unescapeJson must be a String", iae.getMessage()); } + private void assertRecordsMatch(final Record expectedRecord, final Object result) { + assertInstanceOf(Record.class, result); + final Record resultRecord = (Record) result; + assertMapsMatch(expectedRecord.toMap(), resultRecord.toMap(), true); + } + + @SuppressWarnings("unchecked") + private void assertMapsMatch(final Map<String, Object> expectedMap, final Object result, final boolean convertMapToRecord) { + assertInstanceOf(Map.class, result); + final Map<String, Object> resultMap = (Map<String, Object>) result; + assertEquals(expectedMap.size(), resultMap.size()); + + for (final Map.Entry<String, Object> e : expectedMap.entrySet()) { + // can't directly assertEquals two Object[] as the #equals method checks whether they're the same Object, rather than comparing the array content + if (e.getValue() instanceof Object[] expectedArray) { + final Object resultObj = resultMap.get(e.getKey()); + // Record conversion changes Collections to Arrays, otherwise they remain Collections + final Object[] resultArray = convertMapToRecord ? (Object[]) resultObj : ((Collection<?>) resultObj).toArray(); + assertArrayEquals(expectedArray, resultArray); + } else { + assertEquals(e.getValue(), resultMap.get(e.getKey())); + } + } + } + @Test public void testHash() { final Record record = getCaseTestRecord(); diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index de4ddf27a7..30a7e2737d 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -59,6 +59,7 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.EnumMap; @@ -463,7 +464,11 @@ public class DataTypeUtils { } public static Record toRecord(final Object value, final String fieldName) { - return toRecord(value, fieldName, StandardCharsets.UTF_8); + return toRecord(value, fieldName, false); + } + + public static Record toRecord(final Object value, final String fieldName, final boolean recursive) { + return toRecord(value, fieldName, StandardCharsets.UTF_8, recursive); } public static RecordSchema inferSchema(final Map<String, Object> values, final String fieldName, final Charset charset) { @@ -472,7 +477,6 @@ public class DataTypeUtils { } final List<RecordField> inferredFieldTypes = new ArrayList<>(); - final Map<String, Object> coercedValues = new LinkedHashMap<>(); for (final Map.Entry<?, ?> entry : values.entrySet()) { final Object keyValue = entry.getKey(); @@ -487,21 +491,55 @@ public class DataTypeUtils { final RecordField recordField = new RecordField(key, inferredDataType, true); inferredFieldTypes.add(recordField); - final Object coercedValue = convertType(rawValue, inferredDataType, fieldName, charset); - coercedValues.put(key, coercedValue); + convertType(rawValue, inferredDataType, fieldName, charset); } - final RecordSchema inferredSchema = new SimpleRecordSchema(inferredFieldTypes); - return inferredSchema; + return new SimpleRecordSchema(inferredFieldTypes); } public static Record toRecord(final Object value, final String fieldName, final Charset charset) { + return toRecord(value, fieldName, charset, false); + } + + private static Object convertNestedObject(final Object rawValue, final String key, final Charset charset) { + final Object coercedValue; + if (rawValue instanceof Map<?, ?>) { + coercedValue = toRecord(rawValue, key, charset, true); + } else if (rawValue instanceof Object[]) { + final Object[] rawArray = (Object[]) rawValue; + final List<Object> objList = new ArrayList<>(rawArray.length); + for (final Object o : rawArray) { + objList.add(o instanceof Map<?, ?> ? toRecord(o, key, charset, true) : o); + } + coercedValue = objList.toArray(); + } else if (rawValue instanceof Collection<?>) { + final Collection<?> objCollection = (Collection<?>) rawValue; + // Records have ARRAY DataTypes, so convert any Collections + final List<Object> objList = new ArrayList<>(objCollection.size()); + for (final Object o : objCollection) { + objList.add(o instanceof Map<?, ?> ? toRecord(o, key, charset, true) : o); + } + coercedValue = objList.toArray(); + } else { + coercedValue = rawValue; + } + return coercedValue; + } + + public static Record toRecord(final Object value, final String fieldName, final Charset charset, final boolean recursive) { if (value == null) { return null; } if (value instanceof Record) { - return ((Record) value); + final Record record = ((Record) value); + if (recursive) { + record.getRawFieldNames().forEach(name -> { + final Object rawValue = record.getValue(name); + record.setValue(name, convertNestedObject(rawValue, name, charset)); + }); + } + return record; } final List<RecordField> inferredFieldTypes = new ArrayList<>(); @@ -522,7 +560,9 @@ public class DataTypeUtils { final RecordField recordField = new RecordField(key, inferredDataType, true); inferredFieldTypes.add(recordField); - final Object coercedValue = convertType(rawValue, inferredDataType, fieldName, charset); + final Object coercedValue = recursive + ? convertNestedObject(rawValue, key, charset) + : convertType(rawValue, inferredDataType, fieldName, charset); coercedValues.put(key, coercedValue); } @@ -946,11 +986,22 @@ public class DataTypeUtils { } public static Object[] convertRecordArrayToJavaArray(final Object[] array, final DataType elementDataType) { - if (array == null || array.length == 0 || Arrays.stream(array).allMatch(o -> isScalarValue(elementDataType, o))) { + if (array == null || array.length == 0) { return array; - } else { - return Arrays.stream(array).map(o -> convertRecordFieldtoObject(o, elementDataType)).toArray(); } + + final List<Object> objList = new ArrayList<>(array.length); + boolean nonScalarConverted = false; + for (final Object o : array) { + if (isScalarValue(elementDataType, o)) { + objList.add(o); + } else { + nonScalarConverted = true; + objList.add(convertRecordFieldtoObject(o, elementDataType)); + } + } + + return !nonScalarConverted ? array : objList.toArray(); } public static boolean isMapTypeCompatible(final Object value) { diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index fae0538d64..49738c04c1 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -936,7 +936,16 @@ The following record path expression would convert the record into an escaped JS === unescapeJson Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set. -Optionally convert JSON Objects parsed as Maps into Records (defaults to false). + +This function takes up to three arguments: + +1. The Record Path of the stringified JSON element to be parsed +2. (optional) whether to convert parsed JSON Objects from Maps to Records, default = `false` +3. (optional) whether to recursively convert nested JSON Objects from Maps to Records, default = `false` + +*Note*: RecordSetWriters will not be able to serialise Maps, so fields may be omitted if not converted to Records. +For example, using `unescapeJson` to replace the root ("/") of a Record in `UpdateRecord` without using the Record conversion, +would result in an empty output because the RecordSetWriter is unable to match the resultant Map content to a RecordSchema. For example, given a schema such as: @@ -989,6 +998,23 @@ The following record path expression would return: Given a record such as: +---- +{ + "json_str": "{\"name\":\"John\",\"age\":30,\"addresses\"[{\"address_1\": \"123 Fake Street\"}]}" +} +---- + +The following record path expression would return: + +|========================================================== +| RecordPath | Return value +| `unescapeJson(/json_str, 'false')` | {"name"="John", "age"=30, "addresses"=[{"address_1"="123 Fake Street"}]} (as a Map, with each entry in "addresses" as a Map) +| `unescapeJson(/json_str, 'true', 'false')` | {"name": "John", "age": 30, "addresses"=[{"address_1"="123 Fake Street"}]} (as a Record, with each entry in "addresses" as a Map) +| `unescapeJson(/json_str, 'true', 'true')` | {"name": "John", "age": 30, "addresses": [{"address_1": "123 Fake Street"}]} (as a Record, with each entry in "addresses" as a Record) +|========================================================== + +Given a record such as: + ---- { "json_str": "\"John\"" @@ -1002,7 +1028,7 @@ The following record path expression would return: | `unescapeJson(/json_str)` | "John" |========================================================== -Note that the target schema must be pre-defined if the unescaped JSON is to be set in a Record's fields - Infer Schema will not currently do this automatically. +*Note* that the target schema must be pre-defined if the `unescaped` JSON is to be set in a Record's fields - Infer Schema will not currently do this automatically. === hash diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 7be81770b9..94659ad988 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -795,6 +795,7 @@ <exclude>src/test/resources/TestUpdateRecord/input/addresses.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/embedded-string.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/multi-arrays.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/input/organisation.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person-address.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person-stringified-name.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person-with-null-array.json</exclude> @@ -803,6 +804,7 @@ <exclude>src/test/resources/TestUpdateRecord/output/full-addresses.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/name-and-mother-same.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/name-fields-only.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/output/organisation.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude> @@ -815,6 +817,8 @@ <exclude>src/test/resources/TestUpdateRecord/schema/embedded-record.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc</exclude> + <exclude>src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc</exclude> + <exclude>src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-address.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude> diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java index 6375038e7f..e379fc2ccb 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java @@ -26,6 +26,7 @@ import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.LogMessage; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -36,6 +37,7 @@ import org.junit.jupiter.api.condition.OS; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; @@ -468,6 +470,68 @@ public class TestUpdateRecord { runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); } + @Test + public void testSetNestedRootWithUnescapeJsonCall() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + final Path input = Paths.get("src/test/resources/TestUpdateRecord/input/organisation.json"); + + // recursive conversion + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json, 'true', 'true')"); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/organisation.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).getFirst().assertContentEquals(expectedOutput); + assertTrue(runner.getLogger().getErrorMessages().isEmpty()); + + // no conversion + runner.clearTransferState(); + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json)"); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + expectedOutput = """ + [ { + "name" : null, + "departments" : null, + "address" : null + } ]"""; + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).getFirst().assertContentEquals(expectedOutput); + assertTrue(runner.getLogger().getErrorMessages().isEmpty()); + + // non-recursive conversion + runner.clearTransferState(); + runner.enqueue(input); + runner.setProperty("/", "unescapeJson(/record_json, 'true')"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_FAILURE, 1); + final LogMessage errorMessage = runner.getLogger().getErrorMessages().getFirst(); + assertTrue(errorMessage.getMsg().contains("ClassCastException")); + assertTrue(errorMessage.getMsg().contains("Record")); + } + @Test public void testSetFieldWithUnescapeJsonCall() throws InitializationException, IOException { final JsonTreeReader jsonReader = new JsonTreeReader(); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/organisation.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/organisation.json new file mode 100644 index 0000000000..d3280c53af --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/organisation.json @@ -0,0 +1,3 @@ +{ + "record_json": "{\"name\":\"An Organisation\",\"departments\":[{\"name\":\"Department 1\",\"manager\":\"Joe Bloggs\"},{\"name\":\"Coders\",\"manager\":\"Jane Biggs\"}],\"address\":{\"address_1\":\"123 Fake Street\",\"address_2\":\"Somewhere There\",\"postcode\":\"AB1 6HT\"}}" +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/organisation.json b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/organisation.json new file mode 100644 index 0000000000..2251c9c219 --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/organisation.json @@ -0,0 +1,15 @@ +[ { + "name" : "An Organisation", + "departments" : [ { + "name" : "Department 1", + "manager" : "Joe Bloggs" + }, { + "name" : "Coders", + "manager" : "Jane Biggs" + } ], + "address" : { + "address_1" : "123 Fake Street", + "address_2" : "Somewhere There", + "postcode" : "AB1 6HT" + } +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc new file mode 100644 index 0000000000..620c4cb0ce --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc @@ -0,0 +1,8 @@ +{ + "name": "record_json", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "record_json", "type": "string" } + ] +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc new file mode 100644 index 0000000000..ec3bd1c9b9 --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc @@ -0,0 +1,78 @@ +{ + "fields": [ + { + "name": "name", + "type": [ + "string", + "null" + ] + }, + { + "name": "departments", + "type": [ + { + "items": { + "fields": [ + { + "name": "name", + "type": [ + "string", + "null" + ] + }, + { + "name": "manager", + "type": [ + "string", + "null" + ] + } + ], + "name": "department", + "namespace": "test.nifi", + "type": "record" + }, + "type": "array" + }, + "null" + ] + }, + { + "name": "address", + "type": [ + { + "fields": [ + { + "name": "address_1", + "type": [ + "string", + "null" + ] + }, + { + "name": "address_2", + "type": [ + "string", + "null" + ] + }, + { + "name": "postcode", + "type": [ + "string", + "null" + ] + } + ], + "name": "address", + "namespace": "test.nifi", + "type": "record" + }, + "null" + ] + } + ], + "name": "organisation", + "namespace": "test.nifi", + "type": "record" +} \ No newline at end of file