Repository: nifi Updated Branches: refs/heads/master dbf259508 -> 0831059d2
NIFI-5287 Made LookupRecord able to take in flowfile attributes and combine them with lookup keys. NIFI-5287 Removed unneeded property descriptor. NIFI-5287 Added additional changes from a code review. This closes #2777. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0831059d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0831059d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0831059d Branch: refs/heads/master Commit: 0831059d2caca366b208df72d3160ee825f78c98 Parents: dbf2595 Author: Mike Thomsen <mikerthom...@gmail.com> Authored: Sat Jun 9 13:43:24 2018 -0400 Committer: Koji Kawamura <ijokaruma...@apache.org> Committed: Thu Jun 14 10:54:44 2018 +0900 ---------------------------------------------------------------------- .../processors/standard/LookupAttribute.java | 3 +- .../nifi/processors/standard/LookupRecord.java | 26 +++++----- .../standard/TestLookupAttribute.java | 52 ++++++++++++++++++++ .../processors/standard/TestLookupRecord.java | 47 ++++++++++++++++++ .../org/apache/nifi/lookup/LookupService.java | 13 +++++ 5 files changed, 127 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java index 56ad58f..20ea00f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java @@ -222,7 +222,8 @@ public class LookupAttribute extends AbstractProcessor { final PropertyValue lookupKeyExpression = e.getValue(); final String lookupKey = lookupKeyExpression.evaluateAttributeExpressions(flowFile).getValue(); final String attributeName = e.getKey().getName(); - final Optional<String> attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey)); + final Optional<String> attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey), + flowFile.getAttributes()); matched = putAttribute(attributeName, attributeValue, attributes, includeEmptyValues, logger) || matched; if (!matched && logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 589272f..c687f74 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -17,18 +17,6 @@ package org.apache.nifi.processors.standard; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -61,6 +49,18 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.Tuple; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + @EventDriven @SideEffectFree @@ -292,7 +292,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa final Optional<?> lookupValueOption; try { - lookupValueOption = lookupService.lookup(lookupCoordinates); + lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes()); } catch (final Exception e) { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java index 1cf41b8..3f9d2fa 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Set; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; import org.apache.nifi.lookup.SimpleKeyValueLookupService; import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.reporting.InitializationException; @@ -31,6 +33,7 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertNotNull; @@ -140,6 +143,27 @@ public class TestLookupAttribute { runner.assertNotValid(); } + @Test + public void testLookupServicePassFlowfileAttributes() throws InitializationException { + final LookupService service = new TestService(); + + final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute()); + runner.addControllerService("simple-key-value-lookup-service", service); + runner.enableControllerService(service); + runner.assertValid(service); + runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service"); + runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "false"); + runner.setProperty("baz", "${attr1}"); + runner.assertValid(); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("user_defined", "key4"); + + runner.enqueue("some content".getBytes(), attributes); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(LookupAttribute.REL_MATCHED, 1); + } + private static class InvalidLookupService extends AbstractControllerService implements StringLookupService { @Override public Optional<String> lookup(Map<String, Object> coordinates) { @@ -155,4 +179,32 @@ public class TestLookupAttribute { } } + static class TestService extends AbstractControllerService implements StringLookupService { + @Override + public Optional<String> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException { + Assert.assertNotNull(coordinates); + Assert.assertNotNull(context); + Assert.assertEquals(1, coordinates.size()); + Assert.assertTrue(context.containsKey("user_defined")); + + return Optional.of("Test!"); + } + + @Override + public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException { + return Optional.empty(); + } + + @Override + public Class<?> getValueType() { + return String.class; + } + + @Override + public Set<String> getRequiredKeys() { + Set set = new HashSet(); + set.add("key"); + return set; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index 8cdce71..30b2b24 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -44,6 +44,7 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -85,6 +86,32 @@ public class TestLookupRecord { } @Test + public void testFlowfileAttributesPassed() { + Map<String, String> attrs = new HashMap<>(); + attrs.put("schema.name", "person"); + attrs.put("something_something", "test"); + + Map<String, Object> expected = new HashMap<>(); + expected.putAll(attrs); + + lookupService.setExpectedContext(expected); + + lookupService.addValue("John Doe", "Soccer"); + lookupService.addValue("Jane Doe", "Basketball"); + lookupService.addValue("Jimmy Doe", "Football"); + + runner.enqueue("", attrs); + runner.run(); + + runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + + out.assertAttributeEquals("record.count", "3"); + out.assertAttributeEquals("mime.type", "text/plain"); + out.assertContentEquals("John Doe,48,Soccer\nJane Doe,47,Basketball\nJimmy Doe,14,Football\n"); + } + + @Test public void testAllMatch() throws InitializationException { lookupService.addValue("John Doe", "Soccer"); lookupService.addValue("Jane Doe", "Basketball"); @@ -372,6 +399,7 @@ public class TestLookupRecord { private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map<String, String> values = new HashMap<>(); + private Map<String, Object> expectedContext; public void addValue(final String key, final String value) { values.put(key, value); @@ -382,6 +410,11 @@ public class TestLookupRecord { return String.class; } + public Optional<String> lookup(final Map<String, Object> coordinates, Map<String, String> context) { + validateContext(context); + return lookup(coordinates); + } + @Override public Optional<String> lookup(final Map<String, Object> coordinates) { if (coordinates == null || coordinates.get("lookup") == null) { @@ -400,6 +433,20 @@ public class TestLookupRecord { public Set<String> getRequiredKeys() { return Collections.singleton("lookup"); } + + public void setExpectedContext(Map<String, Object> expectedContext) { + this.expectedContext = expectedContext; + } + + private void validateContext(Map<String, String> context) { + if (expectedContext != null) { + for (Map.Entry<String, Object> entry : expectedContext.entrySet()) { + Assert.assertTrue(String.format("%s was not in coordinates.", entry.getKey()), + context.containsKey(entry.getKey())); + Assert.assertEquals("Wrong value", entry.getValue(), context.get(entry.getKey())); + } + } + } } private static class RecordLookup extends AbstractControllerService implements RecordLookupService { http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java index a1f904d..6ef1a5a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java @@ -36,6 +36,19 @@ public interface LookupService<T> extends ControllerService { Optional<T> lookup(Map<String, Object> coordinates) throws LookupFailureException; /** + * Looks up a value that corresponds to the given map, coordinates. Additional contextual information will also be passed into the + * map labeled context from sources such as flowfile attributes. + * + * @param coordinates a Map of key/value pairs that indicate the information that should be looked up + * @param context a Map of additional information + * @return a value that corresponds to the given coordinates + * @throws LookupFailureException if unable to lookup a value for the given coordinates + */ + default Optional<T> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException { + return lookup(coordinates); + } + + /** * @return the Class that represents the type of value that will be returned by {@link #lookup(Map)} */ Class<?> getValueType();