This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new e2067c4ea1 NIFI-10865 allow RecordPath's unescapeJson to convert de-serialised JSON Objects into Records e2067c4ea1 is described below commit e2067c4ea19f0b1b30ddb89717a190fe09f8b87f Author: Chris Sampson <chris.samp...@naimuri.com> AuthorDate: Tue Nov 22 11:44:53 2022 +0000 NIFI-10865 allow RecordPath's unescapeJson to convert de-serialised JSON Objects into Records NIFI-10865 allow UpdateRecord to replace the Record root for relative paths, e.g. when a RecordPath function is used to modify selected field(s) This closes #6708 Signed-off-by: Mike Thomsen <mthom...@apache.org> --- .../nifi/record/path/functions/UnescapeJson.java | 35 +++++++- .../nifi/record/path/paths/RecordPathCompiler.java | 11 ++- .../apache/nifi/record/path/TestRecordPath.java | 96 +++++++++++++++------- nifi-docs/src/main/asciidoc/record-path-guide.adoc | 22 ++++- .../nifi-standard-processors/pom.xml | 6 ++ .../nifi/processors/standard/UpdateRecord.java | 29 +++++-- .../nifi/processors/standard/TestUpdateRecord.java | 88 ++++++++++++++++++++ .../TestUpdateRecord/input/embedded-string.json | 4 + .../input/person-stringified-name.json | 4 + .../TestUpdateRecord/output/embedded-record.json | 9 ++ .../TestUpdateRecord/output/person-with-name.json | 7 ++ .../TestUpdateRecord/schema/embedded-record.avsc | 39 +++++++++ .../schema/person-with-stringified-name.avsc | 9 ++ 13 files changed, 316 insertions(+), 43 deletions(-) diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java index 6a18320fe8..35f7d93d3f 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/UnescapeJson.java @@ -23,7 +23,10 @@ import org.apache.nifi.record.path.RecordPathEvaluationContext; import org.apache.nifi.record.path.StandardFieldValue; import org.apache.nifi.record.path.exception.RecordPathException; import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.RecordDataType; @@ -31,21 +34,29 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; public class UnescapeJson extends RecordPathSegment { private final RecordPathSegment recordPath; + private final RecordPathSegment convertToRecordRecordPath; + private final ObjectMapper objectMapper = new ObjectMapper(); - public UnescapeJson(final RecordPathSegment recordPath, final boolean absolute) { + public UnescapeJson(final RecordPathSegment recordPath, final RecordPathSegment convertToRecordRecordPath, final boolean absolute) { super("unescapeJson", null, absolute); this.recordPath = recordPath; + this.convertToRecordRecordPath = convertToRecordRecordPath; } @Override public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final boolean convertMapToRecord = convertToRecordRecordPath != null + && Boolean.parseBoolean(RecordPathUtils.getFirstStringValue(convertToRecordRecordPath, context)); + final Stream<FieldValue> fieldValues = recordPath.evaluate(context); return fieldValues.filter(fv -> fv.getValue() != null) .map(fv -> { @@ -58,7 +69,10 @@ public class UnescapeJson extends RecordPathSegment { dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) fv.getField().getDataType()); } - return new StandardFieldValue(convertFieldValue(value, fv.getField().getFieldName(), dataType), fv.getField(), fv.getParent().orElse(null)); + return new StandardFieldValue( + convertFieldValue(value, fv.getField().getFieldName(), dataType, convertMapToRecord), + fv.getField(), fv.getParent().orElse(null) + ); } catch (IOException e) { throw new RecordPathException("Unable to deserialise JSON String into Record Path value", e); } @@ -69,7 +83,7 @@ public class UnescapeJson extends RecordPathSegment { } @SuppressWarnings("unchecked") - private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType) throws IOException { + private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType, final boolean convertMapToRecord) throws IOException { if (dataType instanceof RecordDataType) { // convert Maps to Records final Map<String, Object> map = objectMapper.readValue(value.toString(), Map.class); @@ -85,7 +99,20 @@ public class UnescapeJson extends RecordPathSegment { return arr; } else { // generic conversion for simpler fields - return objectMapper.readValue(value.toString(), Object.class); + final Object parsed = objectMapper.readValue(value.toString(), Object.class); + if (convertMapToRecord) { + if (DataTypeUtils.isCompatibleDataType(parsed, RecordFieldType.RECORD.getDataType())) { + return DataTypeUtils.toRecord(parsed, fieldName); + } else if (DataTypeUtils.isArrayTypeCompatible(parsed, RecordFieldType.RECORD.getDataType())) { + return Arrays.stream((Object[]) parsed).map(m -> DataTypeUtils.toRecord(m, fieldName)).toArray(Record[]::new); + } else if (parsed instanceof Collection + && !((Collection<Object>) parsed).isEmpty() + && DataTypeUtils.isCompatibleDataType(((Collection<Object>) parsed).stream().findFirst().get(), RecordFieldType.RECORD.getDataType())) { + return ((Collection<Object>) parsed).stream().map(m -> DataTypeUtils.toRecord(m, fieldName)).collect(Collectors.toList()); + } + } + + return parsed; } } } diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index 7ae5b45dbc..f4ea12306a 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -329,8 +329,15 @@ public class RecordPathCompiler { return new EscapeJson(args[0], absolute); } case "unescapeJson": { - final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); - return new UnescapeJson(args[0], absolute); + final int numArgs = argumentListTree.getChildCount(); + + if (numArgs == 1) { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); + return new UnescapeJson(args[0], null, absolute); + } else { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new UnescapeJson(args[0], args[1], absolute); + } } case "hash":{ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 41596e443d..e27c61d0fc 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -60,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +@SuppressWarnings("OptionalGetWithoutIsPresent") public class TestRecordPath { private static final String USER_TIMEZONE_PROPERTY = "user.timezone"; @@ -280,7 +281,7 @@ public class TestRecordPath { final Record record = new MapRecord(schema, values); final FieldValue fieldValue = RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get(); - assertTrue(fieldValue.getField().getFieldName().equals("attributes")); + assertEquals("attributes", fieldValue.getField().getFieldName()); assertEquals("New York", fieldValue.getValue()); assertEquals(record, fieldValue.getParentRecord().get()); } @@ -300,7 +301,7 @@ public class TestRecordPath { final Record record = new MapRecord(schema, values); final FieldValue fieldValue = RecordPath.compile("/attributes/.['city']").evaluate(record).getSelectedFields().findFirst().get(); - assertTrue(fieldValue.getField().getFieldName().equals("attributes")); + assertEquals("attributes", fieldValue.getField().getFieldName()); assertEquals("New York", fieldValue.getValue()); assertEquals(record, fieldValue.getParentRecord().get()); } @@ -1094,21 +1095,24 @@ public class TestRecordPath { // Special character cases values.put("name", "John Doe"); - assertEquals( - "John\nDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\n')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + assertEquals("John\nDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\n')") + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing whitespace to new line"); values.put("name", "John\nDoe"); assertEquals("John Doe", RecordPath.compile("replaceRegex(/name, '\\n', ' ')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing new line to whitespace"); values.put("name", "John Doe"); assertEquals("John\tDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\t')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing whitespace to tab"); values.put("name", "John\tDoe"); assertEquals("John Doe", RecordPath.compile("replaceRegex(/name, '\\t', ' ')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing tab to whitespace"); } @@ -1126,23 +1130,27 @@ public class TestRecordPath { final Record record = new MapRecord(schema, values); // Quotes - // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't to do so at NiFi UI. + // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't do so at NiFi UI. // The test record path is equivalent to replaceRegex(/name, '\'', '"') values.put("name", "'John' 'Doe'"); assertEquals("\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, '\\'', '\"')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing quote to double-quote"); values.put("name", "\"John\" \"Doe\""); assertEquals("'John' 'Doe'", RecordPath.compile("replaceRegex(/name, '\"', '\\'')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing double-quote to single-quote"); values.put("name", "'John' 'Doe'"); assertEquals("\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, \"'\", \"\\\"\")") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing quote to double-quote, the function arguments are wrapped by double-quote"); values.put("name", "\"John\" \"Doe\""); assertEquals("'John' 'Doe'", RecordPath.compile("replaceRegex(/name, \"\\\"\", \"'\")") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing double-quote to single-quote, the function arguments are wrapped by double-quote"); } @@ -1160,15 +1168,17 @@ public class TestRecordPath { final Record record = new MapRecord(schema, values); // Back-slash - // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't to do so at NiFi UI. + // NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't do so at NiFi UI. // The test record path is equivalent to replaceRegex(/name, '\\', '/') values.put("name", "John\\Doe"); assertEquals("John/Doe", RecordPath.compile("replaceRegex(/name, '\\\\', '/')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing a back-slash to forward-slash"); values.put("name", "John/Doe"); assertEquals("John\\Doe", RecordPath.compile("replaceRegex(/name, '/', '\\\\')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Replacing a forward-slash to back-slash"); } @@ -1188,11 +1198,13 @@ public class TestRecordPath { // Brackets values.put("name", "J[o]hn Do[e]"); assertEquals("J(o)hn Do(e)", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\[', '('), '\\]', ')')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Square brackets can be escaped with back-slash"); values.put("name", "J(o)hn Do(e)"); assertEquals("J[o]hn Do[e]", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\(', '['), '\\)', ']')") - .evaluate(record).getSelectedFields().findFirst().get().getValue()); + .evaluate(record).getSelectedFields().findFirst().get().getValue(), + "Brackets can be escaped with back-slash"); } @Test @@ -1629,8 +1641,8 @@ public class TestRecordPath { RecordPath.compile("base64Encode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); assertEquals(Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)), RecordPath.compile("base64Encode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); - assertTrue(Arrays.equals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)), - (byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue())); + assertArrayEquals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)), + (byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()); List<Object> actualValues = RecordPath.compile("base64Encode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList()); IntStream.range(0, 3).forEach(i -> { Object expectedObject = expectedValues.get(i); @@ -1638,7 +1650,7 @@ public class TestRecordPath { if (actualObject instanceof String) { assertEquals(expectedObject, actualObject); } else if (actualObject instanceof byte[]) { - assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject)); + assertArrayEquals((byte[]) expectedObject, (byte[]) actualObject); } }); } @@ -1660,7 +1672,7 @@ public class TestRecordPath { assertEquals("John", RecordPath.compile("base64Decode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); assertEquals("Doe", RecordPath.compile("base64Decode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue()); - assertTrue(Arrays.equals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue())); + assertArrayEquals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()); List<Object> actualValues = RecordPath.compile("base64Decode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList()); IntStream.range(0, 3).forEach(i -> { Object expectedObject = expectedValues.get(i); @@ -1668,7 +1680,7 @@ public class TestRecordPath { if (actualObject instanceof String) { assertEquals(expectedObject, actualObject); } else if (actualObject instanceof byte[]) { - assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject)); + assertArrayEquals((byte[]) expectedObject, (byte[]) actualObject); } }); } @@ -1733,8 +1745,8 @@ public class TestRecordPath { new RecordField("json_str", RecordFieldType.STRING.getDataType()) )); - // test CHOICE resulting in nested ARRAY of RECORDs - final Record recordAddressesArray = new MapRecord(schema, + // test CHOICE resulting in nested ARRAY of Records + final Record mapAddressesArray = new MapRecord(schema, Collections.singletonMap( "json_str", "{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}") @@ -1749,11 +1761,11 @@ public class TestRecordPath { Collections.singletonMap("address_1", "456 Anywhere Road") )); }}, - RecordPath.compile("unescapeJson(/json_str)").evaluate(recordAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + RecordPath.compile("unescapeJson(/json_str)").evaluate(mapAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() ); // test CHOICE resulting in nested single RECORD - final Record recordAddressesSingle = new MapRecord(schema, + final Record mapAddressesSingle = new MapRecord(schema, Collections.singletonMap( "json_str", "{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":{\"address_1\":\"123 Somewhere Street\"}}") @@ -1765,7 +1777,35 @@ public class TestRecordPath { put("nicknames", Arrays.asList("J", "Johnny")); put("addresses", Collections.singletonMap("address_1", "123 Somewhere Street")); }}, - RecordPath.compile("unescapeJson(/json_str)").evaluate(recordAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + RecordPath.compile("unescapeJson(/json_str, 'false')").evaluate(mapAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + ); + + // test single Record converted from Map Object + final Record recordFromMap = new MapRecord(schema, + Collections.singletonMap( + "json_str", + "{\"firstName\":\"John\",\"age\":30}") + ); + assertEquals( + DataTypeUtils.toRecord(new HashMap<String, Object>(){{ + put("firstName", "John"); + put("age", 30); + }}, "json_str"), + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() + ); + + // test collection of Record converted from Map collection + final Record recordCollectionFromMaps = new MapRecord(schema, + Collections.singletonMap( + "json_str", + "[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]") + ); + assertEquals( + Arrays.asList( + DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Somewhere Street"), "json_str"), + DataTypeUtils.toRecord(Collections.singletonMap("address_1", "456 Anywhere Road"), "json_str") + ), + RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordCollectionFromMaps).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue() ); // test simple String field diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index b2ef0f5ba7..5c914c7f55 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -893,7 +893,10 @@ The following record path expression would convert the record into an escaped JS === unescapeJson -Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set. For example, given a schema such as: +Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set. +Optionally convert JSON Objects parsed as Maps into Records (defaults to false). + +For example, given a schema such as: ---- { @@ -927,6 +930,23 @@ The following record path expression would populate the record with unescaped JS Given a record such as: +---- +{ + "json_str": "{\"name\":\"John\",\"age\":30}" +} +---- + +The following record path expression would return: + +|========================================================== +| RecordPath | Return value +| `unescapeJson(/json_str, 'true')` | {"name": "John", "age": 30} (as a Record) +| `unescapeJson(/json_str, 'false')` | {"name"="John", "age"=30} (as a Map) +| `unescapeJson(/json_str)` | {"name"="John", "age"=30} (as a Map) +|========================================================== + +Given a record such as: + ---- { "json_str": "\"John\"" diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 00a67b8302..d9ab2bb669 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -661,23 +661,29 @@ <exclude>src/test/resources/TestExtractGrok/apache.log</exclude> <exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude> <exclude>src/test/resources/TestExtractGrok/patterns</exclude> + <exclude>src/test/resources/TestUpdateRecord/input/embedded-string.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person-address.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/input/person-stringified-name.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/person-with-null-array.json</exclude> <exclude>src/test/resources/TestUpdateRecord/input/multi-arrays.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/output/embedded-string.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-null-array.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/name-fields-only.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/name-and-mother-same.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/output/person-with-name.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-new-city.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/schema/embedded-record.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-address.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc</exclude> + <exclude>src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json</exclude> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index 21581f4b54..0cd26a58b0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -201,32 +201,40 @@ public class UpdateRecord extends AbstractRecordProcessor { } private Record processAbsolutePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record) { - final RecordPathResult replacementResult = replacementRecordPath.evaluate(record); - final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList()); + final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, null, record); final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList()); return updateRecord(destinationFieldValues, selectedFields, record); } + private boolean isReplacingRoot(final List<FieldValue> destinationFields) { + return destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent(); + } + private Record processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, Record record) { final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList()); - for (final FieldValue fieldVal : destinationFieldValues) { - final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldVal); - final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList()); - final Object replacementObject = getReplacementObject(selectedFields); - updateFieldValue(fieldVal, replacementObject); + if (isReplacingRoot(destinationFieldValues)) { + final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, destinationFieldValues.get(0), record); + record = updateRecord(destinationFieldValues, selectedFields, record); + } else { + for (final FieldValue fieldVal : destinationFieldValues) { + final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, fieldVal, record); + final Object replacementObject = getReplacementObject(selectedFields); + updateFieldValue(fieldVal, replacementObject); + } } return record; } private Record updateRecord(final List<FieldValue> destinationFields, final List<FieldValue> selectedFields, final Record record) { - if (destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent()) { + if (isReplacingRoot(destinationFields)) { final Object replacement = getReplacementObject(selectedFields); if (replacement == null) { return record; } + if (replacement instanceof Record) { return (Record) replacement; } @@ -262,6 +270,11 @@ public class UpdateRecord extends AbstractRecordProcessor { } } + private List<FieldValue> getSelectedFields(final RecordPath replacementRecordPath, final FieldValue fieldValue, final Record record) { + final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldValue); + return replacementResult.getSelectedFields().collect(Collectors.toList()); + } + private Object getReplacementObject(final List<FieldValue> selectedFields) { if (selectedFields.size() > 1) { final List<RecordField> fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java index 9893833e14..6375038e7f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java @@ -438,6 +438,94 @@ public class TestUpdateRecord { runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); } + @Test + public void testSetRootWithUnescapeJsonCall() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person-stringified-name.json")); + runner.setProperty("/", "unescapeJson(/stringified_name, 'true')"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testSetFieldWithUnescapeJsonCall() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person-stringified-name.json")); + runner.setProperty("/name", "unescapeJson(/stringified_name)"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-name.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testSetNestedRecordWithUnescapeJsonCall() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/embedded-record.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, schemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, schemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/embedded-string.json")); + runner.setProperty("/embedded", "unescapeJson(/str)"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/embedded-record.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } @Test public void testSetRootPathRelativeWithMultipleValues() throws InitializationException, IOException { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/embedded-string.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/embedded-string.json new file mode 100644 index 0000000000..c0eeda0059 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/embedded-string.json @@ -0,0 +1,4 @@ +{ + "str": "{\"label\":\"Test!\",\"child\":{\"name\":\"Child record!\"}}", + "embedded": null +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person-stringified-name.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person-stringified-name.json new file mode 100644 index 0000000000..da4f643953 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person-stringified-name.json @@ -0,0 +1,4 @@ +{ + "id": 485, + "stringified_name": "{\"last\": \"Doe\", \"first\": \"John\"}" +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/embedded-record.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/embedded-record.json new file mode 100644 index 0000000000..d96cd6aa7d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/embedded-record.json @@ -0,0 +1,9 @@ +[ { + "str" : "{\"label\":\"Test!\",\"child\":{\"name\":\"Child record!\"}}", + "embedded" : { + "label" : "Test!", + "child" : { + "name" : "Child record!" + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-name.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-name.json new file mode 100644 index 0000000000..e153afedf6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-name.json @@ -0,0 +1,7 @@ +[ { + "id" : 485, + "name" : { + "last" : "Doe", + "first" : "John" + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/embedded-record.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/embedded-record.avsc new file mode 100644 index 0000000000..cfde1b1df6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/embedded-record.avsc @@ -0,0 +1,39 @@ +{ + "name": "Parent", + "type": "record", + "fields": [ + { + "name": "str", + "type": "string" + }, + { + "name": "embedded", + "type": [ + { + "name": "EmbeddedRecord", + "type": "record", + "fields": [ + { + "name": "label", + "type": "string" + }, + { + "name": "child", + "type": { + "name": "ChildRecord", + "type": "record", + "fields": [ + { + "name": "name", + "type": "string" + } + ] + } + } + ] + }, + "null" + ] + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc new file mode 100644 index 0000000000..d8f2bf25b4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc @@ -0,0 +1,9 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "stringified_name", "type": "string" } + ] +} \ No newline at end of file