NIFI-1083: Added a Grouping Regular Expression property for grouping lines of 
text


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e7c6c7ca
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e7c6c7ca
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e7c6c7ca

Branch: refs/heads/master
Commit: e7c6c7cae9a726cd3338e0af1879983822bd5a00
Parents: 3e538d9
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Nov 5 10:36:38 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Nov 11 10:19:07 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/RouteText.java     | 170 +++++++++++++++++--
 .../nifi/processors/standard/TestRouteText.java | 133 ++++++++++++++-
 2 files changed, 284 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e7c6c7ca/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
index 81e879b..5ba2a98 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
@@ -32,8 +32,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.DynamicRelationship;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -74,6 +76,7 @@ import 
org.apache.nifi.processors.standard.util.NLKBufferedReader;
 public class RouteText extends AbstractProcessor {
 
     public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route";
+    public static final String GROUP_ATTRIBUTE_KEY = "RouteText.Group";
 
     private static final String routeAllMatchValue = "Route to 'matched' if 
line matches all conditions";
     private static final String routeAnyMatchValue = "Route to 'matched' if 
lines matches any condition";
@@ -127,7 +130,7 @@ public class RouteText extends AbstractProcessor {
 
     public static final PropertyDescriptor TRIM_WHITESPACE = new 
PropertyDescriptor.Builder()
         .name("Ignore Leading/Trailing Whitespace")
-        .description("Indicates whether or the whitespace at the beginning and 
end of the lines should be ignored when evaluating the line.")
+        .description("Indicates whether or not the whitespace at the beginning 
and end of the lines should be ignored when evaluating the line.")
         .required(true)
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
         .defaultValue("true")
@@ -143,6 +146,18 @@ public class RouteText extends AbstractProcessor {
         .required(true)
         .build();
 
+    static final PropertyDescriptor GROUPING_REGEX = new 
PropertyDescriptor.Builder()
+        .name("Grouping Regular Expression")
+        .description("Specifies a Regular Expression to evaluate against each 
line to determine which Group the line should be placed in. "
+            + "The Regular Expression must have at least one Capturing Group 
that defines the line's Group. If multiple Capturing Groups exist in the 
Regular Expression, the Group from all "
+            + "Capturing Groups. Two lines will not be placed into the same 
FlowFile unless the they both have the same value for the Group "
+            + "(or neither line matches the Regular Expression). For example, 
to group together all lines in a CSV File by the first column, we can set this 
value to \"(.*?),.*\". "
+            + "Two lines that have the same Group but different Relationships 
will never be placed into the same FlowFile.")
+        .addValidator(StandardValidators.createRegexValidator(1, 
Integer.MAX_VALUE, false))
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+
     public static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
         .name("Character Set")
         .description("The Character Set in which the incoming text is encoded")
@@ -164,6 +179,8 @@ public class RouteText extends AbstractProcessor {
         .description("Data that satisfies the required user-defined rules will 
be routed to this Relationship")
         .build();
 
+    private static Group EMPTY_GROUP = new Group(Collections.<String> 
emptyList());
+
     private AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>();
     private List<PropertyDescriptor> properties;
     private volatile String configuredRouteStrategy = 
ROUTE_STRATEGY.getDefaultValue();
@@ -174,6 +191,7 @@ public class RouteText extends AbstractProcessor {
      * {@link #onTrigger(ProcessContext, ProcessSession)}
      */
     private volatile Map<Relationship, PropertyValue> propertyMap = new 
HashMap<>();
+    private volatile Pattern groupingRegex = null;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -188,6 +206,7 @@ public class RouteText extends AbstractProcessor {
         properties.add(CHARACTER_SET);
         properties.add(TRIM_WHITESPACE);
         properties.add(IGNORE_CASE);
+        properties.add(GROUPING_REGEX);
         this.properties = Collections.unmodifiableList(properties);
     }
 
@@ -239,6 +258,7 @@ public class RouteText extends AbstractProcessor {
             newRelationships.add(REL_MATCH);
         }
 
+        newRelationships.add(REL_ORIGINAL);
         newRelationships.add(REL_NO_MATCH);
         this.relationships.set(newRelationships);
     }
@@ -251,6 +271,11 @@ public class RouteText extends AbstractProcessor {
      */
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
+        final String regex = context.getProperty(GROUPING_REGEX).getValue();
+        if (regex != null) {
+            groupingRegex = Pattern.compile(regex);
+        }
+
         final Map<Relationship, PropertyValue> newPropertyMap = new 
HashMap<>();
         for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
             if (!descriptor.isDynamic()) {
@@ -326,21 +351,43 @@ public class RouteText extends AbstractProcessor {
             propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : 
value);
         }
 
-        final Map<Relationship, FlowFile> flowFileMap = new HashMap<>();
+        final Map<Relationship, Map<Group, FlowFile>> flowFileMap = new 
HashMap<>();
+        final Pattern groupPattern = groupingRegex;
 
         session.read(originalFlowFile, new InputStreamCallback() {
             @Override
             public void process(final InputStream in) throws IOException {
-                try (final Reader inReader = new InputStreamReader(in,charset);
+                try (final Reader inReader = new InputStreamReader(in, 
charset);
                     final NLKBufferedReader reader = new 
NLKBufferedReader(inReader)) {
 
                     String line;
                     while ((line = reader.readLine()) != null) {
 
+                        final String matchLine;
+                        if (trim) {
+                            matchLine = line.trim();
+                        } else {
+                            // Always trim off the new-line and carriage 
return characters before evaluating the line.
+                            // The NLKBufferedReader maintains these 
characters so that when we write the line out we can maintain
+                            // these characters. However, we don't actually 
want to match against these characters.
+                            final String lineWithoutEndings;
+                            final int indexOfCR = line.indexOf("\r");
+                            final int indexOfNL = line.indexOf("\n");
+                            if (indexOfCR > 0 && indexOfNL > 0) {
+                                lineWithoutEndings = line.substring(0, 
Math.min(indexOfCR, indexOfNL));
+                            } else if (indexOfCR > 0) {
+                                lineWithoutEndings = line.substring(0, 
indexOfCR);
+                            } else if (indexOfNL > 0) {
+                                lineWithoutEndings = line.substring(0, 
indexOfNL);
+                            } else {
+                                lineWithoutEndings = line;
+                            }
+
+                            matchLine = lineWithoutEndings;
+                        }
+
                         int propertiesThatMatchedLine = 0;
                         for (final Map.Entry<Relationship, Object> entry : 
propValueMap.entrySet()) {
-
-                            String matchLine = trim ? line.trim() : line;
                             boolean lineMatchesProperty = 
lineMatches(matchLine, entry.getValue(), 
context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase);
                             if (lineMatchesProperty) {
                                 propertiesThatMatchedLine++;
@@ -349,7 +396,9 @@ public class RouteText extends AbstractProcessor {
                             if (lineMatchesProperty && 
ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
                                 // route each individual line to each 
Relationship that matches. This one matches.
                                 final Relationship relationship = 
entry.getKey();
-                                appendLine(session, flowFileMap, relationship, 
originalFlowFile, line, charset);
+
+                                final Group group = getGroup(matchLine, 
groupPattern);
+                                appendLine(session, flowFileMap, relationship, 
originalFlowFile, line, charset, group);
                                 continue;
                             }
 
@@ -377,18 +426,31 @@ public class RouteText extends AbstractProcessor {
                         }
 
                         if (relationship != null) {
-                            appendLine(session, flowFileMap, relationship, 
originalFlowFile, line, charset);
+                            final Group group = getGroup(matchLine, 
groupPattern);
+                            appendLine(session, flowFileMap, relationship, 
originalFlowFile, line, charset, group);
                         }
                     }
                 }
             }
         });
 
-        for (final Map.Entry<Relationship, FlowFile> entry : 
flowFileMap.entrySet()) {
-            logger.info("Created {} from {}; routing to relationship {}", new 
Object[] {entry.getValue(), originalFlowFile, entry.getKey()});
-            FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), 
ROUTE_ATTRIBUTE_KEY, entry.getKey().getName());
-            session.getProvenanceReporter().route(updatedFlowFile, 
entry.getKey());
-            session.transfer(updatedFlowFile, entry.getKey());
+        for (final Map.Entry<Relationship, Map<Group, FlowFile>> entry : 
flowFileMap.entrySet()) {
+            final Relationship relationship = entry.getKey();
+            final Map<Group, FlowFile> groupToFlowFileMap = entry.getValue();
+
+            for (final Map.Entry<Group, FlowFile> flowFileEntry : 
groupToFlowFileMap.entrySet()) {
+                final Group group = flowFileEntry.getKey();
+                final FlowFile flowFile = flowFileEntry.getValue();
+
+                final Map<String, String> attributes = new HashMap<>(2);
+                attributes.put(ROUTE_ATTRIBUTE_KEY, relationship.getName());
+                attributes.put(GROUP_ATTRIBUTE_KEY, 
StringUtils.join(group.getCapturedValues(), ", "));
+
+                logger.info("Created {} from {}; routing to relationship {}", 
new Object[] {flowFile, originalFlowFile, relationship.getName()});
+                FlowFile updatedFlowFile = session.putAllAttributes(flowFile, 
attributes);
+                session.getProvenanceReporter().route(updatedFlowFile, 
entry.getKey());
+                session.transfer(updatedFlowFile, entry.getKey());
+            }
         }
 
         // now transfer the original flow file
@@ -400,9 +462,33 @@ public class RouteText extends AbstractProcessor {
     }
 
 
-    private void appendLine(final ProcessSession session, final 
Map<Relationship, FlowFile> flowFileMap,
-        final Relationship relationship, final FlowFile original, final String 
line, final Charset charset) {
-        FlowFile flowFile = flowFileMap.get(relationship);
+    private Group getGroup(final String line, final Pattern groupPattern) {
+        if (groupPattern == null) {
+            return EMPTY_GROUP;
+        } else {
+            final Matcher matcher = groupPattern.matcher(line);
+            if (matcher.matches()) {
+                final List<String> capturingGroupValues = new 
ArrayList<>(matcher.groupCount());
+                for (int i = 1; i <= matcher.groupCount(); i++) {
+                    capturingGroupValues.add(matcher.group(i));
+                }
+                return new Group(capturingGroupValues);
+            } else {
+                return EMPTY_GROUP;
+            }
+        }
+    }
+
+    private void appendLine(final ProcessSession session, final 
Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship 
relationship,
+        final FlowFile original, final String line, final Charset charset, 
final Group group) {
+
+        Map<Group, FlowFile> groupToFlowFileMap = 
flowFileMap.get(relationship);
+        if (groupToFlowFileMap == null) {
+            groupToFlowFileMap = new HashMap<>();
+            flowFileMap.put(relationship, groupToFlowFileMap);
+        }
+
+        FlowFile flowFile = groupToFlowFileMap.get(group);
         if (flowFile == null) {
             flowFile = session.create(original);
         }
@@ -414,7 +500,7 @@ public class RouteText extends AbstractProcessor {
             }
         });
 
-        flowFileMap.put(relationship, flowFile);
+        groupToFlowFileMap.put(group, flowFile);
     }
 
 
@@ -436,4 +522,56 @@ public class RouteText extends AbstractProcessor {
 
         return false;
     }
+
+
+    private static class Group {
+        private final List<String> capturedValues;
+
+        public Group(final List<String> capturedValues) {
+            this.capturedValues = capturedValues;
+        }
+
+        public List<String> getCapturedValues() {
+            return capturedValues;
+        }
+
+        @Override
+        public String toString() {
+            return "Group" + capturedValues;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((capturedValues == null) ? 0 : 
capturedValues.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+
+            if (obj == null) {
+                return false;
+            }
+
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+
+            Group other = (Group) obj;
+            if (capturedValues == null) {
+                if (other.capturedValues != null) {
+                    return false;
+                }
+            } else if (!capturedValues.equals(other.capturedValues)) {
+                return false;
+            }
+
+            return true;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e7c6c7ca/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java
index 432aaa7..0cd0034 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.nifi.processor.Relationship;
@@ -45,7 +46,7 @@ public class TestRouteText {
         runner.run();
 
         Set<Relationship> relationshipSet = 
runner.getProcessor().getRelationships();
-        Set<String> expectedRelationships = new 
HashSet<>(Arrays.asList("matched", "unmatched"));
+        Set<String> expectedRelationships = new 
HashSet<>(Arrays.asList("matched", "unmatched", "original"));
 
         assertEquals(expectedRelationships.size(), relationshipSet.size());
         for (Relationship relationship : relationshipSet) {
@@ -56,7 +57,7 @@ public class TestRouteText {
         runner.setProperty(RouteText.ROUTE_STRATEGY, 
RouteText.ROUTE_TO_MATCHING_PROPERTY_NAME);
 
         relationshipSet = runner.getProcessor().getRelationships();
-        expectedRelationships = new HashSet<>(Arrays.asList("simple", 
"unmatched"));
+        expectedRelationships = new HashSet<>(Arrays.asList("simple", 
"unmatched", "original"));
 
         assertEquals(expectedRelationships.size(), relationshipSet.size());
         for (Relationship relationship : relationshipSet) {
@@ -81,7 +82,7 @@ public class TestRouteText {
         runner.setProperty("simple", "start");
 
         Set<Relationship> relationshipSet = 
runner.getProcessor().getRelationships();
-        Set<String> expectedRelationships = new 
HashSet<>(Arrays.asList("simple", "unmatched"));
+        Set<String> expectedRelationships = new 
HashSet<>(Arrays.asList("simple", "unmatched", "original"));
 
         assertEquals(expectedRelationships.size(), relationshipSet.size());
         for (Relationship relationship : relationshipSet) {
@@ -175,6 +176,132 @@ public class TestRouteText {
     }
 
     @Test
+    public void testGroupSameRelationship() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new RouteText());
+        runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
+        runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*");
+        runner.setProperty("o", "o");
+
+        final String originalText = "1,hello\n2,world\n1,good-bye";
+        runner.enqueue(originalText.getBytes("UTF-8"));
+        runner.run();
+
+        runner.assertTransferCount("o", 2);
+        runner.assertTransferCount("unmatched", 0);
+        runner.assertTransferCount("original", 1);
+
+        final List<MockFlowFile> list = 
runner.getFlowFilesForRelationship("o");
+
+        boolean found1 = false;
+        boolean found2 = false;
+
+        for (final MockFlowFile mff : list) {
+            if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
+                mff.assertContentEquals("1,hello\n1,good-bye");
+                found1 = true;
+            } else {
+                mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2");
+                mff.assertContentEquals("2,world\n");
+                found2 = true;
+            }
+        }
+
+        assertTrue(found1);
+        assertTrue(found2);
+    }
+
+    @Test
+    public void testMultipleGroupsSameRelationship() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new RouteText());
+        runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
+        runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),(.*?),.*");
+        runner.setProperty("o", "o");
+
+        final String originalText = 
"1,5,hello\n2,5,world\n1,8,good-bye\n1,5,overt";
+        runner.enqueue(originalText.getBytes("UTF-8"));
+        runner.run();
+
+        runner.assertTransferCount("o", 3);
+        runner.assertTransferCount("unmatched", 0);
+        runner.assertTransferCount("original", 1);
+
+        final List<MockFlowFile> list = 
runner.getFlowFilesForRelationship("o");
+
+        boolean found1 = false;
+        boolean found2 = false;
+        boolean found3 = false;
+
+        for (final MockFlowFile mff : list) {
+            if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1, 
5")) {
+                mff.assertContentEquals("1,5,hello\n1,5,overt");
+                found1 = true;
+            } else if 
(mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("2, 5")) {
+                mff.assertContentEquals("2,5,world\n");
+                found2 = true;
+            } else {
+                mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "1, 
8");
+                mff.assertContentEquals("1,8,good-bye\n");
+                found3 = true;
+            }
+        }
+
+        assertTrue(found1);
+        assertTrue(found2);
+        assertTrue(found3);
+    }
+
+    @Test
+    public void testGroupDifferentRelationships() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new RouteText());
+        runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
+        runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*");
+        runner.setProperty("l", "l");
+
+        final String originalText = "1,hello\n2,world\n1,good-bye\n3,ciao";
+        runner.enqueue(originalText.getBytes("UTF-8"));
+        runner.run();
+
+        runner.assertTransferCount("l", 2);
+        runner.assertTransferCount("unmatched", 2);
+        runner.assertTransferCount("original", 1);
+
+        List<MockFlowFile> lFlowFiles = 
runner.getFlowFilesForRelationship("l");
+        boolean found1 = false;
+        boolean found2 = false;
+        for (final MockFlowFile mff : lFlowFiles) {
+            if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
+                mff.assertContentEquals("1,hello\n");
+                found1 = true;
+            } else {
+                mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2");
+                mff.assertContentEquals("2,world\n");
+                found2 = true;
+            }
+        }
+
+        assertTrue(found1);
+        assertTrue(found2);
+
+        List<MockFlowFile> unmatchedFlowFiles = 
runner.getFlowFilesForRelationship("unmatched");
+        found1 = false;
+        boolean found3 = false;
+        for (final MockFlowFile mff : unmatchedFlowFiles) {
+            if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
+                mff.assertContentEquals("1,good-bye\n");
+                found1 = true;
+            } else {
+                mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "3");
+                mff.assertContentEquals("3,ciao");
+                found3 = true;
+            }
+        }
+
+        assertTrue(found1);
+        assertTrue(found3);
+
+    }
+
+    @Test
     public void testSimpleDefaultContains() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new RouteText());
         runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);

Reply via email to