Repository: nifi Updated Branches: refs/heads/master 1a3c525dd -> 6d16fdf17
NIFI-3952: Updated UpdateRecord to pass field-related variables to the Expression Language This closes #1836. Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6d16fdf1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6d16fdf1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6d16fdf1 Branch: refs/heads/master Commit: 6d16fdf170db72cca41ef5c450f2b75ae2e74699 Parents: 1a3c525 Author: Mark Payne <marka...@hotmail.com> Authored: Mon May 22 10:51:31 2017 -0400 Committer: Bryan Bende <bbe...@apache.org> Committed: Mon May 22 14:32:45 2017 -0400 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 1 + .../nifi/processors/standard/UpdateRecord.java | 27 +++++++-- .../additionalDetails.html | 58 ++++++++++++++++++++ .../processors/standard/TestUpdateRecord.java | 30 ++++++++++ .../output/person-with-capital-lastname.json | 7 +++ 5 files changed, 119 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6d16fdf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index a0932e0..d8021e9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -457,6 +457,7 @@ <exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude> <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude> http://git-wip-us.apache.org/repos/asf/nifi/blob/6d16fdf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index 9151cde..6acc789 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -20,21 +20,24 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; @@ -47,6 +50,7 @@ import org.apache.nifi.record.path.util.RecordPathCache; import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; @EventDriven @@ -60,12 +64,16 @@ import org.apache.nifi.serialization.record.RecordSchema; + "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.") @SeeAlso({ConvertRecord.class}) public class UpdateRecord extends AbstractRecordProcessor { + private static final String FIELD_NAME = "field.name"; + private static final String FIELD_VALUE = "field.value"; + private static final String FIELD_TYPE = "field.type"; private volatile RecordPathCache recordPathCache; private volatile List<String> recordPaths; static final AllowableValue LITERAL_VALUES = new AllowableValue("literal-value", "Literal Value", - "The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with."); + "The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language " + + "may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated."); static final AllowableValue RECORD_PATH_VALUES = new AllowableValue("record-path-value", "Record Path Value", "The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path " + "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, " @@ -142,8 +150,8 @@ public class UpdateRecord extends AbstractRecordProcessor { final RecordPath recordPath = recordPathCache.getCompiled(recordPathText); final RecordPathResult result = recordPath.evaluate(record); - final String replacementValue = context.getProperty(recordPathText).evaluateAttributeExpressions(flowFile).getValue(); if (evaluateValueAsRecordPath) { + final String replacementValue = context.getProperty(recordPathText).evaluateAttributeExpressions(flowFile).getValue(); final RecordPath replacementRecordPath = recordPathCache.getCompiled(replacementValue); // If we have an Absolute RecordPath, we need to evaluate the RecordPath only once against the Record. @@ -154,7 +162,18 @@ public class UpdateRecord extends AbstractRecordProcessor { processRelativePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue); } } else { - result.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue)); + final PropertyValue replacementValue = context.getProperty(recordPathText); + final Map<String, String> fieldVariables = new HashMap<>(4); + + result.getSelectedFields().forEach(fieldVal -> { + fieldVariables.clear(); + fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName()); + fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null)); + fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name()); + + final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue(); + fieldVal.updateValue(evaluatedReplacementVal); + }); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6d16fdf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html index c20f48a..b73b4b4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html @@ -328,5 +328,63 @@ these simple type coercions for us. </p> + + + + <h3>Example 5 - Use Expression Language to Modify Value</h3> + + <p> + This example will capitalize the value of all 'name' fields, regardless of + where in the Record hierarchy the field is found. This is done by referencing the 'field.value' variable in the Expression Language. + We can also access the field.name variable and the field.type variable. + </p> + + <table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>Replacement Value Strategy</td> + <td>Literal Value</td> + </tr> + <tr> + <td>//name</td> + <td>${field.value:toUpper()}</td> + </tr> + </table> + + <p> + This will yield the following output: + </p> + +<code> +<pre> + [{ + "id": 17, + "name": "JOHN", + "child": { + "id": "1" + }, + "siblingIds": [4, 8], + "siblings": [ + { "name": "JEREMY", "id": 4 }, + { "name": "JULIA", "id": 8 } + ] + }, + { + "id": 98, + "name": "JANE", + "child": { + "id": 2 + }, + "gender": "F", + "siblingIds": [], + "siblings": [] + }] +</pre> +</code> + + </body> </html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6d16fdf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java index 6d88e57..2c1f6ff 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java @@ -214,4 +214,34 @@ public class TestUpdateRecord { final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json"))); runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); } + + @Test + public void testFieldValuesInEL() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/name/last", "${field.value:toUpper()}"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.LITERAL_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6d16fdf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json new file mode 100644 index 0000000..db847a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json @@ -0,0 +1,7 @@ +[ { + "id" : 485, + "name" : { + "last" : "DOE", + "first" : "John" + } +} ] \ No newline at end of file