NIFI-1083: Added a Grouping Regular Expression property for grouping lines of text
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e7c6c7ca Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e7c6c7ca Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e7c6c7ca Branch: refs/heads/master Commit: e7c6c7cae9a726cd3338e0af1879983822bd5a00 Parents: 3e538d9 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Nov 5 10:36:38 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Nov 11 10:19:07 2015 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/RouteText.java | 170 +++++++++++++++++-- .../nifi/processors/standard/TestRouteText.java | 133 ++++++++++++++- 2 files changed, 284 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e7c6c7ca/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java index 81e879b..5ba2a98 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java @@ -32,8 +32,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicRelationship; import org.apache.nifi.annotation.behavior.EventDriven; @@ -74,6 +76,7 @@ import org.apache.nifi.processors.standard.util.NLKBufferedReader; public class RouteText extends AbstractProcessor { public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route"; + public static final String GROUP_ATTRIBUTE_KEY = "RouteText.Group"; private static final String routeAllMatchValue = "Route to 'matched' if line matches all conditions"; private static final String routeAnyMatchValue = "Route to 'matched' if lines matches any condition"; @@ -127,7 +130,7 @@ public class RouteText extends AbstractProcessor { public static final PropertyDescriptor TRIM_WHITESPACE = new PropertyDescriptor.Builder() .name("Ignore Leading/Trailing Whitespace") - .description("Indicates whether or the whitespace at the beginning and end of the lines should be ignored when evaluating the line.") + .description("Indicates whether or not the whitespace at the beginning and end of the lines should be ignored when evaluating the line.") .required(true) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .defaultValue("true") @@ -143,6 +146,18 @@ public class RouteText extends AbstractProcessor { .required(true) .build(); + static final PropertyDescriptor GROUPING_REGEX = new PropertyDescriptor.Builder() + .name("Grouping Regular Expression") + .description("Specifies a Regular Expression to evaluate against each line to determine which Group the line should be placed in. " + + "The Regular Expression must have at least one Capturing Group that defines the line's Group. If multiple Capturing Groups exist in the Regular Expression, the Group from all " + + "Capturing Groups. Two lines will not be placed into the same FlowFile unless the they both have the same value for the Group " + + "(or neither line matches the Regular Expression). For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\". " + + "Two lines that have the same Group but different Relationships will never be placed into the same FlowFile.") + .addValidator(StandardValidators.createRegexValidator(1, Integer.MAX_VALUE, false)) + .expressionLanguageSupported(false) + .required(false) + .build(); + public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() .name("Character Set") .description("The Character Set in which the incoming text is encoded") @@ -164,6 +179,8 @@ public class RouteText extends AbstractProcessor { .description("Data that satisfies the required user-defined rules will be routed to this Relationship") .build(); + private static Group EMPTY_GROUP = new Group(Collections.<String> emptyList()); + private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>(); private List<PropertyDescriptor> properties; private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue(); @@ -174,6 +191,7 @@ public class RouteText extends AbstractProcessor { * {@link #onTrigger(ProcessContext, ProcessSession)} */ private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>(); + private volatile Pattern groupingRegex = null; @Override protected void init(final ProcessorInitializationContext context) { @@ -188,6 +206,7 @@ public class RouteText extends AbstractProcessor { properties.add(CHARACTER_SET); properties.add(TRIM_WHITESPACE); properties.add(IGNORE_CASE); + properties.add(GROUPING_REGEX); this.properties = Collections.unmodifiableList(properties); } @@ -239,6 +258,7 @@ public class RouteText extends AbstractProcessor { newRelationships.add(REL_MATCH); } + newRelationships.add(REL_ORIGINAL); newRelationships.add(REL_NO_MATCH); this.relationships.set(newRelationships); } @@ -251,6 +271,11 @@ public class RouteText extends AbstractProcessor { */ @OnScheduled public void onScheduled(final ProcessContext context) { + final String regex = context.getProperty(GROUPING_REGEX).getValue(); + if (regex != null) { + groupingRegex = Pattern.compile(regex); + } + final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>(); for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { if (!descriptor.isDynamic()) { @@ -326,21 +351,43 @@ public class RouteText extends AbstractProcessor { propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value); } - final Map<Relationship, FlowFile> flowFileMap = new HashMap<>(); + final Map<Relationship, Map<Group, FlowFile>> flowFileMap = new HashMap<>(); + final Pattern groupPattern = groupingRegex; session.read(originalFlowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - try (final Reader inReader = new InputStreamReader(in,charset); + try (final Reader inReader = new InputStreamReader(in, charset); final NLKBufferedReader reader = new NLKBufferedReader(inReader)) { String line; while ((line = reader.readLine()) != null) { + final String matchLine; + if (trim) { + matchLine = line.trim(); + } else { + // Always trim off the new-line and carriage return characters before evaluating the line. + // The NLKBufferedReader maintains these characters so that when we write the line out we can maintain + // these characters. However, we don't actually want to match against these characters. + final String lineWithoutEndings; + final int indexOfCR = line.indexOf("\r"); + final int indexOfNL = line.indexOf("\n"); + if (indexOfCR > 0 && indexOfNL > 0) { + lineWithoutEndings = line.substring(0, Math.min(indexOfCR, indexOfNL)); + } else if (indexOfCR > 0) { + lineWithoutEndings = line.substring(0, indexOfCR); + } else if (indexOfNL > 0) { + lineWithoutEndings = line.substring(0, indexOfNL); + } else { + lineWithoutEndings = line; + } + + matchLine = lineWithoutEndings; + } + int propertiesThatMatchedLine = 0; for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) { - - String matchLine = trim ? line.trim() : line; boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase); if (lineMatchesProperty) { propertiesThatMatchedLine++; @@ -349,7 +396,9 @@ public class RouteText extends AbstractProcessor { if (lineMatchesProperty && ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) { // route each individual line to each Relationship that matches. This one matches. final Relationship relationship = entry.getKey(); - appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset); + + final Group group = getGroup(matchLine, groupPattern); + appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group); continue; } @@ -377,18 +426,31 @@ public class RouteText extends AbstractProcessor { } if (relationship != null) { - appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset); + final Group group = getGroup(matchLine, groupPattern); + appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group); } } } } }); - for (final Map.Entry<Relationship, FlowFile> entry : flowFileMap.entrySet()) { - logger.info("Created {} from {}; routing to relationship {}", new Object[] {entry.getValue(), originalFlowFile, entry.getKey()}); - FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName()); - session.getProvenanceReporter().route(updatedFlowFile, entry.getKey()); - session.transfer(updatedFlowFile, entry.getKey()); + for (final Map.Entry<Relationship, Map<Group, FlowFile>> entry : flowFileMap.entrySet()) { + final Relationship relationship = entry.getKey(); + final Map<Group, FlowFile> groupToFlowFileMap = entry.getValue(); + + for (final Map.Entry<Group, FlowFile> flowFileEntry : groupToFlowFileMap.entrySet()) { + final Group group = flowFileEntry.getKey(); + final FlowFile flowFile = flowFileEntry.getValue(); + + final Map<String, String> attributes = new HashMap<>(2); + attributes.put(ROUTE_ATTRIBUTE_KEY, relationship.getName()); + attributes.put(GROUP_ATTRIBUTE_KEY, StringUtils.join(group.getCapturedValues(), ", ")); + + logger.info("Created {} from {}; routing to relationship {}", new Object[] {flowFile, originalFlowFile, relationship.getName()}); + FlowFile updatedFlowFile = session.putAllAttributes(flowFile, attributes); + session.getProvenanceReporter().route(updatedFlowFile, entry.getKey()); + session.transfer(updatedFlowFile, entry.getKey()); + } } // now transfer the original flow file @@ -400,9 +462,33 @@ public class RouteText extends AbstractProcessor { } - private void appendLine(final ProcessSession session, final Map<Relationship, FlowFile> flowFileMap, - final Relationship relationship, final FlowFile original, final String line, final Charset charset) { - FlowFile flowFile = flowFileMap.get(relationship); + private Group getGroup(final String line, final Pattern groupPattern) { + if (groupPattern == null) { + return EMPTY_GROUP; + } else { + final Matcher matcher = groupPattern.matcher(line); + if (matcher.matches()) { + final List<String> capturingGroupValues = new ArrayList<>(matcher.groupCount()); + for (int i = 1; i <= matcher.groupCount(); i++) { + capturingGroupValues.add(matcher.group(i)); + } + return new Group(capturingGroupValues); + } else { + return EMPTY_GROUP; + } + } + } + + private void appendLine(final ProcessSession session, final Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship relationship, + final FlowFile original, final String line, final Charset charset, final Group group) { + + Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.get(relationship); + if (groupToFlowFileMap == null) { + groupToFlowFileMap = new HashMap<>(); + flowFileMap.put(relationship, groupToFlowFileMap); + } + + FlowFile flowFile = groupToFlowFileMap.get(group); if (flowFile == null) { flowFile = session.create(original); } @@ -414,7 +500,7 @@ public class RouteText extends AbstractProcessor { } }); - flowFileMap.put(relationship, flowFile); + groupToFlowFileMap.put(group, flowFile); } @@ -436,4 +522,56 @@ public class RouteText extends AbstractProcessor { return false; } + + + private static class Group { + private final List<String> capturedValues; + + public Group(final List<String> capturedValues) { + this.capturedValues = capturedValues; + } + + public List<String> getCapturedValues() { + return capturedValues; + } + + @Override + public String toString() { + return "Group" + capturedValues; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((capturedValues == null) ? 0 : capturedValues.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + Group other = (Group) obj; + if (capturedValues == null) { + if (other.capturedValues != null) { + return false; + } + } else if (!capturedValues.equals(other.capturedValues)) { + return false; + } + + return true; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e7c6c7ca/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java index 432aaa7..0cd0034 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteText.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.nifi.processor.Relationship; @@ -45,7 +46,7 @@ public class TestRouteText { runner.run(); Set<Relationship> relationshipSet = runner.getProcessor().getRelationships(); - Set<String> expectedRelationships = new HashSet<>(Arrays.asList("matched", "unmatched")); + Set<String> expectedRelationships = new HashSet<>(Arrays.asList("matched", "unmatched", "original")); assertEquals(expectedRelationships.size(), relationshipSet.size()); for (Relationship relationship : relationshipSet) { @@ -56,7 +57,7 @@ public class TestRouteText { runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHING_PROPERTY_NAME); relationshipSet = runner.getProcessor().getRelationships(); - expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched")); + expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched", "original")); assertEquals(expectedRelationships.size(), relationshipSet.size()); for (Relationship relationship : relationshipSet) { @@ -81,7 +82,7 @@ public class TestRouteText { runner.setProperty("simple", "start"); Set<Relationship> relationshipSet = runner.getProcessor().getRelationships(); - Set<String> expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched")); + Set<String> expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched", "original")); assertEquals(expectedRelationships.size(), relationshipSet.size()); for (Relationship relationship : relationshipSet) { @@ -175,6 +176,132 @@ public class TestRouteText { } @Test + public void testGroupSameRelationship() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new RouteText()); + runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS); + runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*"); + runner.setProperty("o", "o"); + + final String originalText = "1,hello\n2,world\n1,good-bye"; + runner.enqueue(originalText.getBytes("UTF-8")); + runner.run(); + + runner.assertTransferCount("o", 2); + runner.assertTransferCount("unmatched", 0); + runner.assertTransferCount("original", 1); + + final List<MockFlowFile> list = runner.getFlowFilesForRelationship("o"); + + boolean found1 = false; + boolean found2 = false; + + for (final MockFlowFile mff : list) { + if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) { + mff.assertContentEquals("1,hello\n1,good-bye"); + found1 = true; + } else { + mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2"); + mff.assertContentEquals("2,world\n"); + found2 = true; + } + } + + assertTrue(found1); + assertTrue(found2); + } + + @Test + public void testMultipleGroupsSameRelationship() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new RouteText()); + runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS); + runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),(.*?),.*"); + runner.setProperty("o", "o"); + + final String originalText = "1,5,hello\n2,5,world\n1,8,good-bye\n1,5,overt"; + runner.enqueue(originalText.getBytes("UTF-8")); + runner.run(); + + runner.assertTransferCount("o", 3); + runner.assertTransferCount("unmatched", 0); + runner.assertTransferCount("original", 1); + + final List<MockFlowFile> list = runner.getFlowFilesForRelationship("o"); + + boolean found1 = false; + boolean found2 = false; + boolean found3 = false; + + for (final MockFlowFile mff : list) { + if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1, 5")) { + mff.assertContentEquals("1,5,hello\n1,5,overt"); + found1 = true; + } else if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("2, 5")) { + mff.assertContentEquals("2,5,world\n"); + found2 = true; + } else { + mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "1, 8"); + mff.assertContentEquals("1,8,good-bye\n"); + found3 = true; + } + } + + assertTrue(found1); + assertTrue(found2); + assertTrue(found3); + } + + @Test + public void testGroupDifferentRelationships() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new RouteText()); + runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS); + runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*"); + runner.setProperty("l", "l"); + + final String originalText = "1,hello\n2,world\n1,good-bye\n3,ciao"; + runner.enqueue(originalText.getBytes("UTF-8")); + runner.run(); + + runner.assertTransferCount("l", 2); + runner.assertTransferCount("unmatched", 2); + runner.assertTransferCount("original", 1); + + List<MockFlowFile> lFlowFiles = runner.getFlowFilesForRelationship("l"); + boolean found1 = false; + boolean found2 = false; + for (final MockFlowFile mff : lFlowFiles) { + if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) { + mff.assertContentEquals("1,hello\n"); + found1 = true; + } else { + mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2"); + mff.assertContentEquals("2,world\n"); + found2 = true; + } + } + + assertTrue(found1); + assertTrue(found2); + + List<MockFlowFile> unmatchedFlowFiles = runner.getFlowFilesForRelationship("unmatched"); + found1 = false; + boolean found3 = false; + for (final MockFlowFile mff : unmatchedFlowFiles) { + if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) { + mff.assertContentEquals("1,good-bye\n"); + found1 = true; + } else { + mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "3"); + mff.assertContentEquals("3,ciao"); + found3 = true; + } + } + + assertTrue(found1); + assertTrue(found3); + + } + + @Test public void testSimpleDefaultContains() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new RouteText()); runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);