This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2d112871db NIFI-8134 allow unescapeJson Record Path function to
recursively convert Maps to Records (#7745)
2d112871db is described below
commit 2d112871db40c8f599d31974d14dd4a2227bf7da
Author: Chris Sampson <[email protected]>
AuthorDate: Tue May 14 22:11:28 2024 +0100
NIFI-8134 allow unescapeJson Record Path function to recursively convert
Maps to Records (#7745)
* NIFI-8134 recursively convert Java Objects to NiFi Records
---
.../nifi/record/path/functions/UnescapeJson.java | 17 +-
.../nifi/record/path/paths/RecordPathCompiler.java | 12 +-
.../apache/nifi/record/path/TestRecordPath.java | 183 ++++++++++++++-------
.../serialization/record/util/DataTypeUtils.java | 73 ++++++--
nifi-docs/src/main/asciidoc/record-path-guide.adoc | 30 +++-
.../nifi-standard-processors/pom.xml | 4 +
.../nifi/processors/standard/TestUpdateRecord.java | 64 +++++++
.../TestUpdateRecord/input/organisation.json | 3 +
.../TestUpdateRecord/output/organisation.json | 15 ++
.../organisation-with-departments-string.avsc | 8 +
.../schema/organisation-with-departments.avsc | 78 +++++++++
11 files changed, 400 insertions(+), 87 deletions(-)
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java
index 35f7d93d3f..d5e821a44b 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java
@@ -44,18 +44,23 @@ public class UnescapeJson extends RecordPathSegment {
private final RecordPathSegment convertToRecordRecordPath;
+ private final RecordPathSegment recursiveRecordConversion;
+
private final ObjectMapper objectMapper = new ObjectMapper();
- public UnescapeJson(final RecordPathSegment recordPath, final
RecordPathSegment convertToRecordRecordPath, final boolean absolute) {
+ public UnescapeJson(final RecordPathSegment recordPath, final
RecordPathSegment convertToRecordRecordPath, final RecordPathSegment
recursiveRecordConversion, final boolean absolute) {
super("unescapeJson", null, absolute);
this.recordPath = recordPath;
this.convertToRecordRecordPath = convertToRecordRecordPath;
+ this.recursiveRecordConversion = recursiveRecordConversion;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext
context) {
final boolean convertMapToRecord = convertToRecordRecordPath != null
&&
Boolean.parseBoolean(RecordPathUtils.getFirstStringValue(convertToRecordRecordPath,
context));
+ final boolean recursiveMapToRecord = recursiveRecordConversion != null
+ &&
Boolean.parseBoolean(RecordPathUtils.getFirstStringValue(recursiveRecordConversion,
context));
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
@@ -70,7 +75,7 @@ public class UnescapeJson extends RecordPathSegment {
}
return new StandardFieldValue(
- convertFieldValue(value,
fv.getField().getFieldName(), dataType, convertMapToRecord),
+ convertFieldValue(value,
fv.getField().getFieldName(), dataType, convertMapToRecord,
recursiveMapToRecord),
fv.getField(), fv.getParent().orElse(null)
);
} catch (IOException e) {
@@ -83,7 +88,7 @@ public class UnescapeJson extends RecordPathSegment {
}
@SuppressWarnings("unchecked")
- private Object convertFieldValue(final Object value, final String
fieldName, final DataType dataType, final boolean convertMapToRecord) throws
IOException {
+ private Object convertFieldValue(final Object value, final String
fieldName, final DataType dataType, final boolean convertMapToRecord, final
boolean recursiveMapToRecord) throws IOException {
if (dataType instanceof RecordDataType) {
// convert Maps to Records
final Map<String, Object> map =
objectMapper.readValue(value.toString(), Map.class);
@@ -102,13 +107,13 @@ public class UnescapeJson extends RecordPathSegment {
final Object parsed = objectMapper.readValue(value.toString(),
Object.class);
if (convertMapToRecord) {
if (DataTypeUtils.isCompatibleDataType(parsed,
RecordFieldType.RECORD.getDataType())) {
- return DataTypeUtils.toRecord(parsed, fieldName);
+ return DataTypeUtils.toRecord(parsed, fieldName,
recursiveMapToRecord);
} else if (DataTypeUtils.isArrayTypeCompatible(parsed,
RecordFieldType.RECORD.getDataType())) {
- return Arrays.stream((Object[]) parsed).map(m ->
DataTypeUtils.toRecord(m, fieldName)).toArray(Record[]::new);
+ return Arrays.stream((Object[]) parsed).map(m ->
DataTypeUtils.toRecord(m, fieldName,
recursiveMapToRecord)).toArray(Record[]::new);
} else if (parsed instanceof Collection
&& !((Collection<Object>) parsed).isEmpty()
&&
DataTypeUtils.isCompatibleDataType(((Collection<Object>)
parsed).stream().findFirst().get(), RecordFieldType.RECORD.getDataType())) {
- return ((Collection<Object>) parsed).stream().map(m ->
DataTypeUtils.toRecord(m, fieldName)).collect(Collectors.toList());
+ return ((Collection<Object>) parsed).stream().map(m ->
DataTypeUtils.toRecord(m, fieldName,
recursiveMapToRecord)).collect(Collectors.toList());
}
}
diff --git
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index 9061014295..a75b3376c0 100644
---
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -352,13 +352,11 @@ public class RecordPathCompiler {
case "unescapeJson": {
final int numArgs = argumentListTree.getChildCount();
- if (numArgs == 1) {
- final RecordPathSegment[] args =
getArgPaths(argumentListTree, 1, functionName, absolute);
- return new UnescapeJson(args[0], null, absolute);
- } else {
- final RecordPathSegment[] args =
getArgPaths(argumentListTree, 2, functionName, absolute);
- return new UnescapeJson(args[0], args[1],
absolute);
- }
+ final RecordPathSegment[] args =
getArgPaths(argumentListTree, numArgs, functionName, absolute);
+ final RecordPathSegment convertToRecord = numArgs > 1
? args[1] : null;
+ final RecordPathSegment recursiveConversion = numArgs
> 2 ? args[2] : null;
+
+ return new UnescapeJson(args[0], convertToRecord,
recursiveConversion, absolute);
}
case "hash":{
final RecordPathSegment[] args =
getArgPaths(argumentListTree, 2, functionName, absolute);
diff --git
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
index f374325528..a548ccd812 100644
---
a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
+++
b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java
@@ -17,6 +17,21 @@
package org.apache.nifi.record.path;
+import org.apache.nifi.record.path.exception.RecordPathException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.uuid5.Uuid5Util;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
@@ -27,28 +42,16 @@ import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.nifi.record.path.exception.RecordPathException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.type.ArrayDataType;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.apache.nifi.uuid5.Uuid5Util;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -1852,25 +1855,26 @@ public class TestRecordPath {
new RecordField("person",
RecordFieldType.RECORD.getRecordDataType(person))
));
- final Map<String, Object> values = new HashMap<String, Object>(){{
- put("person", new MapRecord(person, new HashMap<String, Object>(){{
- put("firstName", "John");
- put("age", 30);
- put("nicknames", new String[] {"J", "Johnny"});
- put("addresses", new MapRecord[]{
- new MapRecord(address,
Collections.singletonMap("address_1", "123 Somewhere Street")),
- new MapRecord(address,
Collections.singletonMap("address_1", "456 Anywhere Road"))
- });
- }}));
- }};
+ final Map<String, Object> values = Map.of(
+ "person", new MapRecord(person, Map.of(
+ "firstName", "John",
+ "age", 30,
+ "nicknames", new String[] {"J", "Johnny"},
+ "addresses", new MapRecord[]{
+ new MapRecord(address,
Collections.singletonMap("address_1", "123 Somewhere Street")),
+ new MapRecord(address,
Collections.singletonMap("address_1", "456 Anywhere Road"))
+ }
+ ) )
+ );
final Record record = new MapRecord(schema, values);
- assertEquals("\"John\"",
RecordPath.compile("escapeJson(/person/firstName)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
- assertEquals("30",
RecordPath.compile("escapeJson(/person/age)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
+ assertEquals("\"John\"",
RecordPath.compile("escapeJson(/person/firstName)").evaluate(record).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue());
+ assertEquals("30",
RecordPath.compile("escapeJson(/person/age)").evaluate(record).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue());
assertEquals(
-
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123
Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}",
-
RecordPath.compile("escapeJson(/person)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ """
+
{"firstName":"John","age":30,"nicknames":["J","Johnny"],"addresses":[{"address_1":"123
Somewhere Street"},{"address_1":"456 Anywhere Road"}]}""",
+
RecordPath.compile("escapeJson(/person)").evaluate(record).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
);
}
@@ -1899,79 +1903,111 @@ public class TestRecordPath {
final Record mapAddressesArray = new MapRecord(schema,
Collections.singletonMap(
"json_str",
-
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123
Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}")
+ """
+
{"firstName":"John","age":30,"nicknames":["J","Johnny"],"addresses":[{"address_1":"123
Somewhere Street"},{"address_1":"456 Anywhere Road"}]}""")
);
assertEquals(
- new HashMap<String, Object>(){{
- put("firstName", "John");
- put("age", 30);
- put("nicknames", Arrays.asList("J", "Johnny"));
- put("addresses", Arrays.asList(
+ Map.of(
+ "firstName", "John",
+ "age", 30,
+ "nicknames", Arrays.asList("J", "Johnny"),
+ "addresses", Arrays.asList(
Collections.singletonMap("address_1", "123
Somewhere Street"),
Collections.singletonMap("address_1", "456
Anywhere Road")
- ));
- }},
-
RecordPath.compile("unescapeJson(/json_str)").evaluate(mapAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ )
+ ),
+
RecordPath.compile("unescapeJson(/json_str)").evaluate(mapAddressesArray).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
);
// test CHOICE resulting in nested single RECORD
final Record mapAddressesSingle = new MapRecord(schema,
Collections.singletonMap(
"json_str",
-
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":{\"address_1\":\"123
Somewhere Street\"}}")
+ """
+
{"firstName":"John","age":30,"nicknames":["J","Johnny"],"addresses":{"address_1":"123
Somewhere Street"}}""")
);
assertEquals(
- new HashMap<String, Object>(){{
- put("firstName", "John");
- put("age", 30);
- put("nicknames", Arrays.asList("J", "Johnny"));
- put("addresses", Collections.singletonMap("address_1",
"123 Somewhere Street"));
- }},
- RecordPath.compile("unescapeJson(/json_str,
'false')").evaluate(mapAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ Map.of(
+ "firstName", "John",
+ "age", 30,
+ "nicknames", Arrays.asList("J", "Johnny"),
+ "addresses", Collections.singletonMap("address_1", "123
Somewhere Street")
+ ),
+ RecordPath.compile("unescapeJson(/json_str,
'false')").evaluate(mapAddressesSingle).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
);
// test single Record converted from Map Object
final Record recordFromMap = new MapRecord(schema,
Collections.singletonMap(
"json_str",
- "{\"firstName\":\"John\",\"age\":30}")
+ """
+ {"firstName":"John","age":30}""")
);
+ Map<String, Object> expectedMap = new LinkedHashMap<>();
+ expectedMap.put("firstName", "John");
+ expectedMap.put("age", 30);
assertEquals(
- DataTypeUtils.toRecord(new HashMap<String, Object>(){{
- put("firstName", "John");
- put("age", 30);
- }}, "json_str"),
- RecordPath.compile("unescapeJson(/json_str,
'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ DataTypeUtils.toRecord(expectedMap, "json_str"),
+ RecordPath.compile("unescapeJson(/json_str,
'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
+ );
+
+ // test nested Record converted from Map Object
+ final Record nestedRecordFromMap = new MapRecord(schema,
+ Collections.singletonMap(
+ "json_str",
+ """
+
{"firstName":"John","age":30,"addresses":[{"address_1":"123 Fake Street"}]}""")
+ );
+ // recursively convert Maps to Records (addresses becomes and ARRAY or
RECORDs)
+ expectedMap.put("addresses", new Object[]
{DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake
Street"), "addresses")});
+ assertRecordsMatch(
+ DataTypeUtils.toRecord(expectedMap, "json_str"),
+ RecordPath.compile("unescapeJson(/json_str, 'true',
'true')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
+ );
+ // convert Map to Record, without recursion (addresses becomes an
ARRAY, but contents are still Maps)
+ expectedMap.put("addresses", new Object[]
{Collections.singletonMap("address_1", "123 Fake Street")});
+ assertRecordsMatch(
+ DataTypeUtils.toRecord(expectedMap, "json_str"),
+ RecordPath.compile("unescapeJson(/json_str, 'true',
'false')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
+ );
+ // without Map conversion to Record (addresses remains a Collection,
Maps are unchanged)
+ assertMapsMatch(
+ expectedMap,
+ RecordPath.compile("unescapeJson(/json_str,
'false')").evaluate(nestedRecordFromMap).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue(),
+ false
);
// test collection of Record converted from Map collection
final Record recordCollectionFromMaps = new MapRecord(schema,
Collections.singletonMap(
"json_str",
- "[{\"address_1\":\"123 Somewhere
Street\"},{\"address_1\":\"456 Anywhere Road\"}]")
+ """
+ [{"address_1":"123 Somewhere
Street"},{"address_1":"456 Anywhere Road"}]""")
);
assertEquals(
Arrays.asList(
DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Somewhere
Street"), "json_str"),
DataTypeUtils.toRecord(Collections.singletonMap("address_1", "456 Anywhere
Road"), "json_str")
),
- RecordPath.compile("unescapeJson(/json_str,
'true')").evaluate(recordCollectionFromMaps).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ RecordPath.compile("unescapeJson(/json_str,
'true')").evaluate(recordCollectionFromMaps).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
);
// test simple String field
- final Record recordJustName = new MapRecord(schema,
Collections.singletonMap("json_str", "{\"firstName\":\"John\"}"));
+ final Record recordJustName = new MapRecord(schema,
Collections.singletonMap("json_str",
+ """
+ {"firstName":"John"}"""));
assertEquals(
- new HashMap<String, Object>(){{put("firstName", "John");}},
-
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustName).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
+ Map.of("firstName", "John"),
+
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustName).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue()
);
// test simple String
final Record recordJustString = new MapRecord(schema,
Collections.singletonMap("json_str", "\"John\""));
- assertEquals("John",
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustString).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
+ assertEquals("John",
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustString).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue());
// test simple Int
final Record recordJustInt = new MapRecord(schema,
Collections.singletonMap("json_str", "30"));
- assertEquals(30,
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustInt).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
+ assertEquals(30,
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordJustInt).getSelectedFields().findFirst().orElseThrow(AssertionError::new).getValue());
// test invalid JSON
final Record recordInvalidJson = new MapRecord(schema,
Collections.singletonMap("json_str", "{\"invalid\": \"json"));
@@ -1979,7 +2015,7 @@ public class TestRecordPath {
RecordPathException rpe = assertThrows(RecordPathException.class,
() -> RecordPath.compile("unescapeJson(/json_str)")
.evaluate(recordInvalidJson).getSelectedFields()
-
.findFirst().orElseThrow(IllegalStateException::new).getValue());
+
.findFirst().orElseThrow(AssertionError::new).getValue());
assertEquals("Unable to deserialise JSON String into Record Path
value", rpe.getMessage());
// test not String
@@ -1987,10 +2023,35 @@ public class TestRecordPath {
IllegalArgumentException iae =
assertThrows(IllegalArgumentException.class,
() -> RecordPath.compile("unescapeJson(/person/age)")
.evaluate(recordNotString).getSelectedFields()
-
.findFirst().orElseThrow(IllegalStateException::new).getValue());
+
.findFirst().orElseThrow(AssertionError::new).getValue());
assertEquals("Argument supplied to unescapeJson must be a String",
iae.getMessage());
}
+ private void assertRecordsMatch(final Record expectedRecord, final Object
result) {
+ assertInstanceOf(Record.class, result);
+ final Record resultRecord = (Record) result;
+ assertMapsMatch(expectedRecord.toMap(), resultRecord.toMap(), true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void assertMapsMatch(final Map<String, Object> expectedMap, final
Object result, final boolean convertMapToRecord) {
+ assertInstanceOf(Map.class, result);
+ final Map<String, Object> resultMap = (Map<String, Object>) result;
+ assertEquals(expectedMap.size(), resultMap.size());
+
+ for (final Map.Entry<String, Object> e : expectedMap.entrySet()) {
+ // can't directly assertEquals two Object[] as the #equals method
checks whether they're the same Object, rather than comparing the array content
+ if (e.getValue() instanceof Object[] expectedArray) {
+ final Object resultObj = resultMap.get(e.getKey());
+ // Record conversion changes Collections to Arrays, otherwise
they remain Collections
+ final Object[] resultArray = convertMapToRecord ? (Object[])
resultObj : ((Collection<?>) resultObj).toArray();
+ assertArrayEquals(expectedArray, resultArray);
+ } else {
+ assertEquals(e.getValue(), resultMap.get(e.getKey()));
+ }
+ }
+ }
+
@Test
public void testHash() {
final Record record = getCaseTestRecord();
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index de4ddf27a7..30a7e2737d 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -59,6 +59,7 @@ import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
@@ -463,7 +464,11 @@ public class DataTypeUtils {
}
public static Record toRecord(final Object value, final String fieldName) {
- return toRecord(value, fieldName, StandardCharsets.UTF_8);
+ return toRecord(value, fieldName, false);
+ }
+
+ public static Record toRecord(final Object value, final String fieldName,
final boolean recursive) {
+ return toRecord(value, fieldName, StandardCharsets.UTF_8, recursive);
}
public static RecordSchema inferSchema(final Map<String, Object> values,
final String fieldName, final Charset charset) {
@@ -472,7 +477,6 @@ public class DataTypeUtils {
}
final List<RecordField> inferredFieldTypes = new ArrayList<>();
- final Map<String, Object> coercedValues = new LinkedHashMap<>();
for (final Map.Entry<?, ?> entry : values.entrySet()) {
final Object keyValue = entry.getKey();
@@ -487,21 +491,55 @@ public class DataTypeUtils {
final RecordField recordField = new RecordField(key,
inferredDataType, true);
inferredFieldTypes.add(recordField);
- final Object coercedValue = convertType(rawValue,
inferredDataType, fieldName, charset);
- coercedValues.put(key, coercedValue);
+ convertType(rawValue, inferredDataType, fieldName, charset);
}
- final RecordSchema inferredSchema = new
SimpleRecordSchema(inferredFieldTypes);
- return inferredSchema;
+ return new SimpleRecordSchema(inferredFieldTypes);
}
public static Record toRecord(final Object value, final String fieldName,
final Charset charset) {
+ return toRecord(value, fieldName, charset, false);
+ }
+
+ private static Object convertNestedObject(final Object rawValue, final
String key, final Charset charset) {
+ final Object coercedValue;
+ if (rawValue instanceof Map<?, ?>) {
+ coercedValue = toRecord(rawValue, key, charset, true);
+ } else if (rawValue instanceof Object[]) {
+ final Object[] rawArray = (Object[]) rawValue;
+ final List<Object> objList = new ArrayList<>(rawArray.length);
+ for (final Object o : rawArray) {
+ objList.add(o instanceof Map<?, ?> ? toRecord(o, key, charset,
true) : o);
+ }
+ coercedValue = objList.toArray();
+ } else if (rawValue instanceof Collection<?>) {
+ final Collection<?> objCollection = (Collection<?>) rawValue;
+ // Records have ARRAY DataTypes, so convert any Collections
+ final List<Object> objList = new ArrayList<>(objCollection.size());
+ for (final Object o : objCollection) {
+ objList.add(o instanceof Map<?, ?> ? toRecord(o, key, charset,
true) : o);
+ }
+ coercedValue = objList.toArray();
+ } else {
+ coercedValue = rawValue;
+ }
+ return coercedValue;
+ }
+
+ public static Record toRecord(final Object value, final String fieldName,
final Charset charset, final boolean recursive) {
if (value == null) {
return null;
}
if (value instanceof Record) {
- return ((Record) value);
+ final Record record = ((Record) value);
+ if (recursive) {
+ record.getRawFieldNames().forEach(name -> {
+ final Object rawValue = record.getValue(name);
+ record.setValue(name, convertNestedObject(rawValue, name,
charset));
+ });
+ }
+ return record;
}
final List<RecordField> inferredFieldTypes = new ArrayList<>();
@@ -522,7 +560,9 @@ public class DataTypeUtils {
final RecordField recordField = new RecordField(key,
inferredDataType, true);
inferredFieldTypes.add(recordField);
- final Object coercedValue = convertType(rawValue,
inferredDataType, fieldName, charset);
+ final Object coercedValue = recursive
+ ? convertNestedObject(rawValue, key, charset)
+ : convertType(rawValue, inferredDataType, fieldName,
charset);
coercedValues.put(key, coercedValue);
}
@@ -946,11 +986,22 @@ public class DataTypeUtils {
}
public static Object[] convertRecordArrayToJavaArray(final Object[] array,
final DataType elementDataType) {
- if (array == null || array.length == 0 ||
Arrays.stream(array).allMatch(o -> isScalarValue(elementDataType, o))) {
+ if (array == null || array.length == 0) {
return array;
- } else {
- return Arrays.stream(array).map(o -> convertRecordFieldtoObject(o,
elementDataType)).toArray();
}
+
+ final List<Object> objList = new ArrayList<>(array.length);
+ boolean nonScalarConverted = false;
+ for (final Object o : array) {
+ if (isScalarValue(elementDataType, o)) {
+ objList.add(o);
+ } else {
+ nonScalarConverted = true;
+ objList.add(convertRecordFieldtoObject(o, elementDataType));
+ }
+ }
+
+ return !nonScalarConverted ? array : objList.toArray();
}
public static boolean isMapTypeCompatible(final Object value) {
diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
index fae0538d64..49738c04c1 100644
--- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc
@@ -936,7 +936,16 @@ The following record path expression would convert the
record into an escaped JS
=== unescapeJson
Converts a stringified JSON element to a Record, Array or simple field (e.g.
String), using the UTF-8 character set.
-Optionally convert JSON Objects parsed as Maps into Records (defaults to
false).
+
+This function takes up to three arguments:
+
+1. The Record Path of the stringified JSON element to be parsed
+2. (optional) whether to convert parsed JSON Objects from Maps to Records,
default = `false`
+3. (optional) whether to recursively convert nested JSON Objects from Maps to
Records, default = `false`
+
+*Note*: RecordSetWriters will not be able to serialise Maps, so fields may be
omitted if not converted to Records.
+For example, using `unescapeJson` to replace the root ("/") of a Record in
`UpdateRecord` without using the Record conversion,
+would result in an empty output because the RecordSetWriter is unable to match
the resultant Map content to a RecordSchema.
For example, given a schema such as:
@@ -989,6 +998,23 @@ The following record path expression would return:
Given a record such as:
+----
+{
+ "json_str": "{\"name\":\"John\",\"age\":30,\"addresses\"[{\"address_1\":
\"123 Fake Street\"}]}"
+}
+----
+
+The following record path expression would return:
+
+|==========================================================
+| RecordPath | Return value
+| `unescapeJson(/json_str, 'false')` | {"name"="John", "age"=30,
"addresses"=[{"address_1"="123 Fake Street"}]} (as a Map, with each entry in
"addresses" as a Map)
+| `unescapeJson(/json_str, 'true', 'false')` | {"name": "John", "age": 30,
"addresses"=[{"address_1"="123 Fake Street"}]} (as a Record, with each entry in
"addresses" as a Map)
+| `unescapeJson(/json_str, 'true', 'true')` | {"name": "John", "age": 30,
"addresses": [{"address_1": "123 Fake Street"}]} (as a Record, with each entry
in "addresses" as a Record)
+|==========================================================
+
+Given a record such as:
+
----
{
"json_str": "\"John\""
@@ -1002,7 +1028,7 @@ The following record path expression would return:
| `unescapeJson(/json_str)` | "John"
|==========================================================
-Note that the target schema must be pre-defined if the unescaped JSON is to be
set in a Record's fields - Infer Schema will not currently do this
automatically.
+*Note* that the target schema must be pre-defined if the `unescaped` JSON is
to be set in a Record's fields - Infer Schema will not currently do this
automatically.
=== hash
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 7be81770b9..94659ad988 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -795,6 +795,7 @@
<exclude>src/test/resources/TestUpdateRecord/input/addresses.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/embedded-string.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/multi-arrays.json</exclude>
+
<exclude>src/test/resources/TestUpdateRecord/input/organisation.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/person-address.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/person-stringified-name.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/input/person-with-null-array.json</exclude>
@@ -803,6 +804,7 @@
<exclude>src/test/resources/TestUpdateRecord/output/full-addresses.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/name-and-mother-same.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/name-fields-only.json</exclude>
+
<exclude>src/test/resources/TestUpdateRecord/output/organisation.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude>
@@ -815,6 +817,8 @@
<exclude>src/test/resources/TestUpdateRecord/schema/embedded-record.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc</exclude>
+
<exclude>src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc</exclude>
+
<exclude>src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-address.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index 6375038e7f..e379fc2ccb 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -26,6 +26,7 @@ import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -36,6 +37,7 @@ import org.junit.jupiter.api.condition.OS;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
@@ -468,6 +470,68 @@ public class TestUpdateRecord {
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
+ @Test
+ public void testSetNestedRootWithUnescapeJsonCall() throws
InitializationException, IOException {
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+
+ final String inputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc")));
+ final String outputSchemaText = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc")));
+
+ runner.setProperty(jsonReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT,
inputSchemaText);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", jsonWriter);
+ runner.setProperty(jsonWriter,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT,
outputSchemaText);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.setProperty(jsonWriter, "Schema Write Strategy",
"full-schema-attribute");
+ runner.enableControllerService(jsonWriter);
+
+ runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY,
UpdateRecord.RECORD_PATH_VALUES);
+ final Path input =
Paths.get("src/test/resources/TestUpdateRecord/input/organisation.json");
+
+ // recursive conversion
+ runner.enqueue(input);
+ runner.setProperty("/", "unescapeJson(/record_json, 'true', 'true')");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+ String expectedOutput = new
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/organisation.json")));
+
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).getFirst().assertContentEquals(expectedOutput);
+ assertTrue(runner.getLogger().getErrorMessages().isEmpty());
+
+ // no conversion
+ runner.clearTransferState();
+ runner.enqueue(input);
+ runner.setProperty("/", "unescapeJson(/record_json)");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+ expectedOutput = """
+ [ {
+ "name" : null,
+ "departments" : null,
+ "address" : null
+ } ]""";
+
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).getFirst().assertContentEquals(expectedOutput);
+ assertTrue(runner.getLogger().getErrorMessages().isEmpty());
+
+ // non-recursive conversion
+ runner.clearTransferState();
+ runner.enqueue(input);
+ runner.setProperty("/", "unescapeJson(/record_json, 'true')");
+ runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY,
UpdateRecord.RECORD_PATH_VALUES);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_FAILURE, 1);
+ final LogMessage errorMessage =
runner.getLogger().getErrorMessages().getFirst();
+ assertTrue(errorMessage.getMsg().contains("ClassCastException"));
+ assertTrue(errorMessage.getMsg().contains("Record"));
+ }
+
@Test
public void testSetFieldWithUnescapeJsonCall() throws
InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/organisation.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/organisation.json
new file mode 100644
index 0000000000..d3280c53af
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/organisation.json
@@ -0,0 +1,3 @@
+{
+ "record_json": "{\"name\":\"An
Organisation\",\"departments\":[{\"name\":\"Department 1\",\"manager\":\"Joe
Bloggs\"},{\"name\":\"Coders\",\"manager\":\"Jane
Biggs\"}],\"address\":{\"address_1\":\"123 Fake
Street\",\"address_2\":\"Somewhere There\",\"postcode\":\"AB1 6HT\"}}"
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/organisation.json
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/organisation.json
new file mode 100644
index 0000000000..2251c9c219
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/organisation.json
@@ -0,0 +1,15 @@
+[ {
+ "name" : "An Organisation",
+ "departments" : [ {
+ "name" : "Department 1",
+ "manager" : "Joe Bloggs"
+ }, {
+ "name" : "Coders",
+ "manager" : "Jane Biggs"
+ } ],
+ "address" : {
+ "address_1" : "123 Fake Street",
+ "address_2" : "Somewhere There",
+ "postcode" : "AB1 6HT"
+ }
+} ]
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc
new file mode 100644
index 0000000000..620c4cb0ce
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments-string.avsc
@@ -0,0 +1,8 @@
+{
+ "name": "record_json",
+ "namespace": "nifi",
+ "type": "record",
+ "fields": [
+ { "name": "record_json", "type": "string" }
+ ]
+}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc
new file mode 100644
index 0000000000..ec3bd1c9b9
--- /dev/null
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/organisation-with-departments.avsc
@@ -0,0 +1,78 @@
+{
+ "fields": [
+ {
+ "name": "name",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "departments",
+ "type": [
+ {
+ "items": {
+ "fields": [
+ {
+ "name": "name",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "manager",
+ "type": [
+ "string",
+ "null"
+ ]
+ }
+ ],
+ "name": "department",
+ "namespace": "test.nifi",
+ "type": "record"
+ },
+ "type": "array"
+ },
+ "null"
+ ]
+ },
+ {
+ "name": "address",
+ "type": [
+ {
+ "fields": [
+ {
+ "name": "address_1",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "address_2",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "postcode",
+ "type": [
+ "string",
+ "null"
+ ]
+ }
+ ],
+ "name": "address",
+ "namespace": "test.nifi",
+ "type": "record"
+ },
+ "null"
+ ]
+ }
+ ],
+ "name": "organisation",
+ "namespace": "test.nifi",
+ "type": "record"
+}
\ No newline at end of file