Repository: nifi Updated Branches: refs/heads/master 16898668c -> 35e8bedcc
NIFI-3249 - UpdateAttribute performance improvements This closes #1356 Signed-off-by: jpercivall <jperciv...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/35e8bedc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/35e8bedc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/35e8bedc Branch: refs/heads/master Commit: 35e8bedcc878ceb67638a50ff6c2506a7bb92c75 Parents: 1689866 Author: Bryan Rosander <brosan...@apache.org> Authored: Thu Dec 22 12:02:31 2016 -0500 Committer: jpercivall <jperciv...@apache.org> Committed: Wed Dec 28 14:54:05 2016 -0500 ---------------------------------------------------------------------- .../processors/attributes/UpdateAttribute.java | 115 +++++++++++-------- .../additionalDetails.html | 6 + 2 files changed, 75 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/35e8bedc/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 86f523a..4dee379 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -44,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -144,16 +145,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { }; // static properties + public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression"; public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder() - .name("Delete Attributes Expression") - .description("Regular expression for attributes to be deleted from FlowFiles.") + .name(DELETE_ATTRIBUTES_EXPRESSION_NAME) + .displayName(DELETE_ATTRIBUTES_EXPRESSION_NAME) + .description("Regular expression for attributes to be deleted from FlowFiles. Existing attributes that match will be deleted regardless of whether they are updated by this processor.") .required(false) .addValidator(DELETE_PROPERTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + public static final String STORE_STATE_NAME = "Store State"; public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder() - .name("Store State") + .name(STORE_STATE_NAME) + .displayName(STORE_STATE_NAME) .description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " + "FlowFile in a stateless manner. Selecting a stateful option will not only store the attributes on the FlowFile but also in the Processors " + "state. See the 'Stateful Usage' topic of the 'Additional Details' section of this processor's documentation for more information") @@ -161,14 +166,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { .allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY) .defaultValue(DO_NOT_STORE_STATE) .build(); + + public static final String STATEFUL_VARIABLES_INIT_VALUE_NAME = "Stateful Variables Initial Value"; public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder() - .name("Stateful Variables Initial Value") + .name(STATEFUL_VARIABLES_INIT_VALUE_NAME) + .displayName(STATEFUL_VARIABLES_INIT_VALUE_NAME) .description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " + "when state does not contain a value for the variable. This is required if running statefully but can be empty if needed.") .required(false) .addValidator(Validator.VALID) .build(); + private volatile Map<String, Action> defaultActions; + private volatile boolean debugEnabled; + public UpdateAttribute() { relationships = statelessRelationshipSet; @@ -259,6 +270,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { context.getStateManager().setState(tempMap, Scope.LOCAL); } + + defaultActions = getDefaultActions(context.getProperties()); + debugEnabled = getLogger().isDebugEnabled(); + } + + @OnStopped + public void onStopped() { + defaultActions = null; } @Override @@ -397,11 +416,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { return; } - final Map<PropertyDescriptor, String> properties = context.getProperties(); - - // get the default actions - final Map<String, Action> defaultActions = getDefaultActions(properties); - // record which rule should be applied to which flow file - when operating // in 'use clone' mode, this collection will contain a number of entries // that map to single element lists. this is because the original flowfile @@ -427,6 +441,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { return; } + Map<String, Action> defaultActions = this.defaultActions; + // if there is update criteria specified, evaluate it if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) { // apply the actions for each rule and transfer the flowfile @@ -493,7 +509,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { rulesForFlowFile.add(rule); // log if appropriate - if (logger.isDebugEnabled()) { + if (debugEnabled) { logger.debug(this + " all conditions met for rule '" + rule.getName() + "'. Using flow file - " + flowfileToUse); } } @@ -517,16 +533,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } private PropertyValue getPropertyValue(final String text, final ProcessContext context) { - PropertyValue currentValue = propertyValues.get(text); - if (currentValue == null) { - currentValue = context.newPropertyValue(text); - PropertyValue previousValue = propertyValues.putIfAbsent(text, currentValue); - if (previousValue != null) { - currentValue = previousValue; - } - } - - return currentValue; + return propertyValues.computeIfAbsent(text, k -> context.newPropertyValue(text)); } // Evaluates the specified condition on the specified flowfile. @@ -580,29 +587,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { statefulAttributesToSet = null; } - // go through each action + boolean debugEnabled = this.debugEnabled; for (final Action action : actions.values()) { - if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) { - try { - final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue(); - - // log if appropriate - if (logger.isDebugEnabled()) { - logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName)); - } - - if (statefulAttributesToSet != null) { - if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) { - statefulAttributesToSet.put(action.getAttribute(), newAttributeValue); - } - } - - attributesToUpdate.put(action.getAttribute(), newAttributeValue); - } catch (final ProcessException pe) { - throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe); - } - } else { + String attribute = action.getAttribute(); + if (DELETE_ATTRIBUTES_EXPRESSION_NAME.equals(attribute)) { try { final String actionValue = action.getValue(); final String regex = (actionValue == null) ? null : @@ -614,17 +603,43 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { if (pattern.matcher(key).matches()) { // log if appropriate - if (logger.isDebugEnabled()) { - logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this, - key, flowfile, regex)); + if (debugEnabled) { + logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this, key, flowfile, regex)); } attributesToDelete.add(key); } } + // No point in updating if they will be removed + attributesToUpdate.keySet().removeAll(attributesToDelete); } } catch (final ProcessException pe) { - throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", action.getAttribute(), pe), pe); + throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", attribute, pe), pe); + } + } else { + boolean notDeleted = !attributesToDelete.contains(attribute); + boolean setStatefulAttribute = statefulAttributesToSet != null && !attribute.equals("UpdateAttribute.matchedRule"); + + if (notDeleted || setStatefulAttribute) { + try { + final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue(); + + // log if appropriate + if (debugEnabled) { + logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, attribute, newAttributeValue, flowfile, ruleName)); + } + + if (setStatefulAttribute) { + statefulAttributesToSet.put(attribute, newAttributeValue); + } + + // No point in updating if it will be removed + if (notDeleted) { + attributesToUpdate.put(attribute, newAttributeValue); + } + } catch (final ProcessException pe) { + throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", attribute, pe), pe); + } } } } @@ -644,7 +659,15 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } // update and delete the FlowFile attributes - FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete); + FlowFile returnFlowfile = flowfile; + + if (attributesToUpdate.size() > 0) { + returnFlowfile = session.putAllAttributes(returnFlowfile, attributesToUpdate); + } + + if (attributesToDelete.size() > 0) { + returnFlowfile = session.removeAllAttributes(returnFlowfile, attributesToDelete); + } if(statefulAttributesToSet != null) { context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL); http://git-wip-us.apache.org/repos/asf/nifi/blob/35e8bedc/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html index 8a60c8f..e2a9d83 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html @@ -33,6 +33,12 @@ </p> <p> + Please note that "Delete Attributes Expression" supersedes any updates that occur. If an existing attribute matches the "Delete Attributes Expression", it will be removed whether it + was updated or not. The "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression" + will not detect it. + </p> + + <p> <strong>Basic Usage</strong> </p>