Repository: nifi
Updated Branches:
  refs/heads/master a414f1d51 -> e53ab391f


NIFI-641 Add delete capability to the UpdateAttributes processor.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e53ab391
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e53ab391
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e53ab391

Branch: refs/heads/master
Commit: e53ab391fa0a9a8e97b82367f92e820d0dcb3937
Parents: a414f1d
Author: Joe Skora <jsk...@gmail.com>
Authored: Thu Sep 24 09:22:17 2015 -0400
Committer: Bryan Bende <bbe...@apache.org>
Committed: Thu Sep 24 10:46:10 2015 -0400

----------------------------------------------------------------------
 .../processors/attributes/UpdateAttribute.java  |  84 +++++++++++----
 .../additionalDetails.html                      |  25 ++++-
 .../update/attributes/TestUpdateAttribute.java  | 104 ++++++++++++++++---
 3 files changed, 178 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e53ab391/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index c52eba5..dd81289 100644
--- 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
 
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -64,7 +65,10 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 
 /**
  * This processor supports updating flowfile attributes and can do so
- * conditionally or unconditionally. Like the FlowFileMetadataEnhancer, it can
+ * conditionally or unconditionally.  It can also delete flowfile attributes
+ * that match a regular expression.
+ *
+ * Like the FlowFileMetadataEnhancer, it can
  * be configured with an arbitrary number of optional properties to define how
  * attributes should be updated. Each optional property represents an action
  * that is applied to all incoming flow files. An action is comprised of an
@@ -112,11 +116,11 @@ import 
org.apache.nifi.annotation.behavior.WritesAttribute;
  */
 @EventDriven
 @SideEffectFree
-@Tags({"attributes", "modification", "update", "Attribute Expression 
Language"})
-@CapabilityDescription("Updates the Attributes for a FlowFile by using the 
Attribute Expression Language")
+@Tags({"attributes", "modification", "update", "delete", "Attribute Expression 
Language"})
+@CapabilityDescription("Updates the Attributes for a FlowFile by using the 
Attribute Expression Language and/or deletes the attributes based on a regular 
expression")
 @DynamicProperty(name = "A FlowFile attribute to update", value = "The value 
to set it to", supportsExpressionLanguage = true,
         description = "Updates a FlowFile attribute specified by the Dynamic 
Property's key with the value specified by the Dynamic Property's value")
-@WritesAttribute(attribute = "See additional details", description = "This 
processor may write zero or more attributes as described in additional details")
+@WritesAttribute(attribute = "See additional details", description = "This 
processor may write or remove zero or more attributes as described in 
additional details")
 public class UpdateAttribute extends AbstractProcessor implements Searchable {
 
     private final AtomicReference<Criteria> criteriaCache = new 
AtomicReference<>(null);
@@ -124,6 +128,14 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
 
     private final Set<Relationship> relationships;
 
+    // static properties
+    public static final PropertyDescriptor DELETE_ATTRIBUTES = new 
PropertyDescriptor.Builder()
+            .name("Delete Attributes Expression")
+            .description("Regular expression for attributes to be deleted from 
flowfiles.")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+
     // relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .description("All FlowFiles are routed to this 
relationship").name("success").build();
@@ -140,6 +152,13 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
     }
 
     @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(DELETE_ATTRIBUTES);
+        return Collections.unmodifiableList(descriptors);
+    }
+
+    @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
@@ -435,40 +454,65 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
         }
 
         // attribute values that will be applied to the flow file
-        final Map<String, String> attributes = new HashMap<>(actions.size());
+        final Map<String, String> attributesToUpdate = new 
HashMap<>(actions.size());
+        final Set<String> attributesToDelete = new HashSet<>(actions.size());
 
         // go through each action
         for (final Action action : actions.values()) {
-            try {
-                final String newAttributeValue = 
getPropertyValue(action.getValue(), 
context).evaluateAttributeExpressions(flowfile).getValue();
+            if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) {
+                try {
+                    final String newAttributeValue = 
getPropertyValue(action.getValue(), 
context).evaluateAttributeExpressions(flowfile).getValue();
 
-                // log if appropriate
-                if (logger.isDebugEnabled()) {
-                    logger.debug(String.format("%s setting attribute '%s' = 
'%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, 
flowfile, ruleName));
-                }
+                    // log if appropriate
+                    if (logger.isDebugEnabled()) {
+                        logger.debug(String.format("%s setting attribute '%s' 
= '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, 
flowfile, ruleName));
+                    }
 
-                attributes.put(action.getAttribute(), newAttributeValue);
-            } catch (final ProcessException pe) {
-                throw new ProcessException(String.format("Unable to evaluate 
new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
+                    attributesToUpdate.put(action.getAttribute(), 
newAttributeValue);
+                } catch (final ProcessException pe) {
+                    throw new ProcessException(String.format("Unable to 
evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
+                }
+            } else {
+                try {
+                    final String regex = action.getValue();
+                    if (regex != null) {
+                        Pattern pattern = Pattern.compile(regex);
+                        final Set<String> attributeKeys = 
flowfile.getAttributes().keySet();
+                        for (final String key : attributeKeys) {
+                            if (pattern.matcher(key).matches()) {
+
+                                // log if appropriate
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug(String.format("%s deleting 
attribute '%s' for %s per regex '%s'.", this,
+                                            key, flowfile, regex));
+                                }
+
+                                attributesToDelete.add(key);
+                            }
+                        }
+                    }
+                } catch (final ProcessException pe) {
+                    throw new ProcessException(String.format("Unable to delete 
attribute '%s': %s.", action.getAttribute(), pe), pe);
+                }
             }
         }
 
         // If the 'alternate.identifier' attribute is added, then we want to 
create an ADD_INFO provenance event.
-        final String alternateIdentifier = 
attributes.get(CoreAttributes.ALTERNATE_IDENTIFIER.key());
-        if (alternateIdentifier != null) {
+        final String alternateIdentifierAdd = 
attributesToUpdate.get(CoreAttributes.ALTERNATE_IDENTIFIER.key());
+        if (alternateIdentifierAdd != null) {
             try {
-                final URI uri = new URI(alternateIdentifier);
+                final URI uri = new URI(alternateIdentifierAdd);
                 final String namespace = uri.getScheme();
                 if (namespace != null) {
-                    final String identifier = 
alternateIdentifier.substring(Math.min(namespace.length() + 1, 
alternateIdentifier.length() - 1));
+                    final String identifier = 
alternateIdentifierAdd.substring(Math.min(namespace.length() + 1, 
alternateIdentifierAdd.length() - 1));
                     session.getProvenanceReporter().associate(flowfile, 
namespace, identifier);
                 }
             } catch (final URISyntaxException e) {
             }
         }
 
-        // update the flowfile attributes
-        return session.putAllAttributes(flowfile, attributes);
+        // update and delete the flowfile attributes
+        return session.removeAllAttributes(session.putAllAttributes(flowfile, 
attributesToUpdate), attributesToDelete);
     }
 
     // Gets the default actions.

http://git-wip-us.apache.org/repos/asf/nifi/blob/e53ab391/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
index 30cfce9..cb23635 100644
--- 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
@@ -26,9 +26,10 @@
         <h2>Description:</h2>
         <p>
             This processor updates the attributes of a FlowFile using 
properties or rules that are added by the user. 
-            There are two ways to use this processor to add or modify 
attributes. One way is the "Basic Usage"; this allows you to set default 
attribute changes that affect 
+            There are three ways to use this processor to add or modify 
attributes. One way is the "Basic Usage"; this allows you to set default 
attribute changes that affect
             every FlowFile going through the processor. The second way is the 
"Advanced Usage"; this allows you to make conditional attribute changes that 
only affect a FlowFile if it 
-            meets certain conditions. It is possible to use both methods in 
the same processor at the same time.
+            meets certain conditions. It is possible to use both methods in 
the same processor at the same time.  The third way is the "Delete Attributes 
Expression"; this allows you to
+            provide a regular expression and any attributes with a matching 
name will be deleted.
         </p>
 
         <p>
@@ -189,6 +190,26 @@
             a type of "else" construct. In other words, if none of the rules 
match for the attribute, then the basic usage changes will be made.
         </p>
 
+        <p><strong>Deleting Attributes</strong></p>
+
+        <p>
+            Deleting attributes is a simple as providing a regular expression 
for attribute names to be deleted.  This can be a simple regular expression 
that will
+            match a single attribute or more complex regular expression to 
match a group of similarly named attributes or even seveal individual attribute 
names.
+        </p>
+        <ul>
+            <li><strong>lastUser</strong> - will delete an attribute with the 
name "lastUser".
+            </li>
+            <li><strong>user.*</strong> - will delete attributes beginning 
with "user", including for example "username, "userName", "userID", and 
"users".  But
+                it will not delete "User" or "localuser".
+            </li>
+            <li><strong>(user.*|host.*|.*Date)</strong> - will delete "user", 
"username", "userName", "hostInfo", "hosts", and "updateDate", but not "User", 
"HOST", "update", or "updatedate".
+            </li>
+        </ul>
+
+        <p>
+            The delete attributes function does not produce a Provenance Event 
if the <strong>alternate.identified</strong> Attribute is deleted.
+        </p>
+
         <p><strong>FlowFile Policy</strong>
         </p>
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e53ab391/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
index a5d74f7..f1b75ed 100644
--- 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
+++ 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
@@ -16,11 +16,6 @@
  */
 package org.apache.nifi.update.attributes;
 
-import org.apache.nifi.update.attributes.FlowFilePolicy;
-import org.apache.nifi.update.attributes.Criteria;
-import org.apache.nifi.update.attributes.Condition;
-import org.apache.nifi.update.attributes.Action;
-import org.apache.nifi.update.attributes.Rule;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
@@ -56,8 +51,7 @@ public class TestUpdateAttribute {
     }
 
     private Criteria getCriteria() {
-        final Criteria criteria = new Criteria();
-        return criteria;
+        return new Criteria();
     }
 
     private void addRule(final Criteria criteria, final String name, final 
Collection<String> conditions, final Map<String, String> actions) {
@@ -205,8 +199,8 @@ public class TestUpdateAttribute {
         addRule(criteria, "rule", Arrays.asList(
                 // conditions
                 "${attribute.1:equals('value.1')}"), getMap(
-                        // actions
-                        "attribute.2", "value.2"));
+                // actions
+                "attribute.2", "value.2"));
 
         final TestRunner runner = TestRunners.newTestRunner(new 
UpdateAttribute());
         runner.setAnnotationData(serialize(criteria));
@@ -230,8 +224,8 @@ public class TestUpdateAttribute {
         addRule(criteria, "rule", Arrays.asList(
                 // conditions
                 "${attribute.1:equals('value.1')}"), getMap(
-                        // actions
-                        "attribute.2", "value.2"));
+                // actions
+                "attribute.2", "value.2"));
 
         final TestRunner runner = TestRunners.newTestRunner(new 
UpdateAttribute());
         runner.setAnnotationData(serialize(criteria));
@@ -358,8 +352,8 @@ public class TestUpdateAttribute {
         addRule(criteria, "rule 3", Arrays.asList(
                 // conditions
                 "${attribute.1:equals('value.1')}"), getMap(
-                        // actions
-                        "attribute.2", "value.3"));
+                // actions
+                "attribute.2", "value.3"));
 
         final TestRunner runner = TestRunners.newTestRunner(new 
UpdateAttribute());
         runner.setAnnotationData(serialize(criteria));
@@ -422,4 +416,88 @@ public class TestUpdateAttribute {
         // ensure the attributes are as expected
         flowfile.assertAttributeEquals("default.attr", "-more-stuff");
     }
+
+    @Test
+    public void testSimpleDelete() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
UpdateAttribute());
+        runner.setProperty("Delete Attributes Expression", "attribute.2");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attribute.1", "value.1");
+        attributes.put("attribute.2", "value.2");
+        runner.enqueue(new byte[0], attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
+        final List<MockFlowFile> result = 
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
+        result.get(0).assertAttributeEquals("attribute.1", "value.1");
+        result.get(0).assertAttributeNotExists("attribute.2");
+    }
+
+    @Test
+    public void testRegexDotDelete() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
UpdateAttribute());
+        runner.setProperty("Delete Attributes Expression", "attribute.2");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attribute.1", "value.1");
+        attributes.put("attribute.2", "value.2");
+        attributes.put("attributex2", "valuex2");
+        runner.enqueue(new byte[0], attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
+        final List<MockFlowFile> result = 
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
+        result.get(0).assertAttributeEquals("attribute.1", "value.1");
+        result.get(0).assertAttributeNotExists("attribute.2");
+        result.get(0).assertAttributeNotExists("attributex2");
+    }
+
+    @Test
+    public void testRegexLiteralDotDelete() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
UpdateAttribute());
+        runner.setProperty("Delete Attributes Expression", "attribute\\.2");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attribute.1", "value.1");
+        attributes.put("attribute.2", "value.2");
+        attributes.put("attributex2", "valuex2");
+        runner.enqueue(new byte[0], attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
+        final List<MockFlowFile> result = 
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
+        result.get(0).assertAttributeEquals("attribute.1", "value.1");
+        result.get(0).assertAttributeNotExists("attribute.2");
+        result.get(0).assertAttributeExists("attributex2");
+    }
+
+    @Test
+    public void testRegexGroupDelete() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
UpdateAttribute());
+        runner.setProperty("Delete Attributes Expression", 
"(attribute\\.[2-5]|sample.*)");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("attribute.1", "value.1");
+        attributes.put("attribute.2", "value.2");
+        attributes.put("attribute.6", "value.6");
+        attributes.put("sampleSize", "value.size");
+        attributes.put("sample.1", "value.sample.1");
+        attributes.put("simple.1", "value.simple.1");
+        runner.enqueue(new byte[0], attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
+        final List<MockFlowFile> result = 
runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
+        result.get(0).assertAttributeEquals("attribute.1", "value.1");
+        result.get(0).assertAttributeNotExists("attribute.2");
+        result.get(0).assertAttributeExists("attribute.6");
+        result.get(0).assertAttributeNotExists("sampleSize");
+        result.get(0).assertAttributeNotExists("sample.1");
+        result.get(0).assertAttributeExists("simple.1");
+    }
 }

Reply via email to