Repository: nifi
Updated Branches:
  refs/heads/master dbf259508 -> 0831059d2


NIFI-5287 Made LookupRecord able to take in flowfile attributes and combine 
them with lookup keys.

NIFI-5287 Removed unneeded property descriptor.

NIFI-5287 Added additional changes from a code review.

This closes #2777.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0831059d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0831059d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0831059d

Branch: refs/heads/master
Commit: 0831059d2caca366b208df72d3160ee825f78c98
Parents: dbf2595
Author: Mike Thomsen <mikerthom...@gmail.com>
Authored: Sat Jun 9 13:43:24 2018 -0400
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Thu Jun 14 10:54:44 2018 +0900

----------------------------------------------------------------------
 .../processors/standard/LookupAttribute.java    |  3 +-
 .../nifi/processors/standard/LookupRecord.java  | 26 +++++-----
 .../standard/TestLookupAttribute.java           | 52 ++++++++++++++++++++
 .../processors/standard/TestLookupRecord.java   | 47 ++++++++++++++++++
 .../org/apache/nifi/lookup/LookupService.java   | 13 +++++
 5 files changed, 127 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java
index 56ad58f..20ea00f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java
@@ -222,7 +222,8 @@ public class LookupAttribute extends AbstractProcessor {
                 final PropertyValue lookupKeyExpression = e.getValue();
                 final String lookupKey = 
lookupKeyExpression.evaluateAttributeExpressions(flowFile).getValue();
                 final String attributeName = e.getKey().getName();
-                final Optional<String> attributeValue = 
lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey));
+                final Optional<String> attributeValue = 
lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey),
+                        flowFile.getAttributes());
                 matched = putAttribute(attributeName, attributeValue, 
attributes, includeEmptyValues, logger) || matched;
 
                 if (!matched && logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 589272f..c687f74 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -17,18 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -61,6 +49,18 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.Tuple;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 
 @EventDriven
 @SideEffectFree
@@ -292,7 +292,7 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<Map<String, RecordPa
 
         final Optional<?> lookupValueOption;
         try {
-            lookupValueOption = lookupService.lookup(lookupCoordinates);
+            lookupValueOption = lookupService.lookup(lookupCoordinates, 
flowFile.getAttributes());
         } catch (final Exception e) {
             throw new ProcessException("Failed to lookup coordinates " + 
lookupCoordinates + " in Lookup Service", e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java
index 1cf41b8..3f9d2fa 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
 import org.apache.nifi.lookup.SimpleKeyValueLookupService;
 import org.apache.nifi.lookup.StringLookupService;
 import org.apache.nifi.reporting.InitializationException;
@@ -31,6 +33,7 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;
@@ -140,6 +143,27 @@ public class TestLookupAttribute {
         runner.assertNotValid();
     }
 
+    @Test
+    public void testLookupServicePassFlowfileAttributes() throws 
InitializationException {
+        final LookupService service = new TestService();
+
+        final TestRunner runner = TestRunners.newTestRunner(new 
LookupAttribute());
+        runner.addControllerService("simple-key-value-lookup-service", 
service);
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+        runner.setProperty(LookupAttribute.LOOKUP_SERVICE, 
"simple-key-value-lookup-service");
+        runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "false");
+        runner.setProperty("baz", "${attr1}");
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("user_defined", "key4");
+
+        runner.enqueue("some content".getBytes(), attributes);
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(LookupAttribute.REL_MATCHED, 1);
+    }
+
     private static class InvalidLookupService extends 
AbstractControllerService implements StringLookupService {
       @Override
       public Optional<String> lookup(Map<String, Object> coordinates) {
@@ -155,4 +179,32 @@ public class TestLookupAttribute {
       }
     }
 
+    static class TestService extends AbstractControllerService implements 
StringLookupService {
+        @Override
+        public Optional<String> lookup(Map<String, Object> coordinates, 
Map<String, String> context) throws LookupFailureException {
+            Assert.assertNotNull(coordinates);
+            Assert.assertNotNull(context);
+            Assert.assertEquals(1, coordinates.size());
+            Assert.assertTrue(context.containsKey("user_defined"));
+
+            return Optional.of("Test!");
+        }
+
+        @Override
+        public Optional<String> lookup(Map<String, Object> coordinates) throws 
LookupFailureException {
+            return Optional.empty();
+        }
+
+        @Override
+        public Class<?> getValueType() {
+            return String.class;
+        }
+
+        @Override
+        public Set<String> getRequiredKeys() {
+            Set set = new HashSet();
+            set.add("key");
+            return set;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index 8cdce71..30b2b24 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -44,6 +44,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -85,6 +86,32 @@ public class TestLookupRecord {
     }
 
     @Test
+    public void testFlowfileAttributesPassed() {
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("schema.name", "person");
+        attrs.put("something_something", "test");
+
+        Map<String, Object> expected = new HashMap<>();
+        expected.putAll(attrs);
+
+        lookupService.setExpectedContext(expected);
+
+        lookupService.addValue("John Doe", "Soccer");
+        lookupService.addValue("Jane Doe", "Basketball");
+        lookupService.addValue("Jimmy Doe", "Football");
+
+        runner.enqueue("", attrs);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("John Doe,48,Soccer\nJane 
Doe,47,Basketball\nJimmy Doe,14,Football\n");
+    }
+
+    @Test
     public void testAllMatch() throws InitializationException {
         lookupService.addValue("John Doe", "Soccer");
         lookupService.addValue("Jane Doe", "Basketball");
@@ -372,6 +399,7 @@ public class TestLookupRecord {
 
     private static class MapLookup extends AbstractControllerService 
implements StringLookupService {
         private final Map<String, String> values = new HashMap<>();
+        private Map<String, Object> expectedContext;
 
         public void addValue(final String key, final String value) {
             values.put(key, value);
@@ -382,6 +410,11 @@ public class TestLookupRecord {
             return String.class;
         }
 
+        public Optional<String> lookup(final Map<String, Object> coordinates, 
Map<String, String> context) {
+            validateContext(context);
+            return lookup(coordinates);
+        }
+
         @Override
         public Optional<String> lookup(final Map<String, Object> coordinates) {
             if (coordinates == null || coordinates.get("lookup") == null) {
@@ -400,6 +433,20 @@ public class TestLookupRecord {
         public Set<String> getRequiredKeys() {
             return Collections.singleton("lookup");
         }
+
+        public void setExpectedContext(Map<String, Object> expectedContext) {
+            this.expectedContext = expectedContext;
+        }
+
+        private void validateContext(Map<String, String> context) {
+            if (expectedContext != null) {
+                for (Map.Entry<String, Object> entry : 
expectedContext.entrySet()) {
+                    Assert.assertTrue(String.format("%s was not in 
coordinates.", entry.getKey()),
+                            context.containsKey(entry.getKey()));
+                    Assert.assertEquals("Wrong value", entry.getValue(), 
context.get(entry.getKey()));
+                }
+            }
+        }
     }
 
     private static class RecordLookup extends AbstractControllerService 
implements RecordLookupService {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0831059d/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
index a1f904d..6ef1a5a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
@@ -36,6 +36,19 @@ public interface LookupService<T> extends ControllerService {
     Optional<T> lookup(Map<String, Object> coordinates) throws 
LookupFailureException;
 
     /**
+     * Looks up a value that corresponds to the given map, coordinates. 
Additional contextual information will also be passed into the
+     * map labeled context from sources such as flowfile attributes.
+     *
+     * @param coordinates a Map of key/value pairs that indicate the 
information that should be looked up
+     * @param context a Map of additional information
+     * @return a value that corresponds to the given coordinates
+     * @throws LookupFailureException if unable to lookup a value for the 
given coordinates
+     */
+    default Optional<T> lookup(Map<String, Object> coordinates, Map<String, 
String> context) throws LookupFailureException {
+        return lookup(coordinates);
+    }
+
+    /**
      * @return the Class that represents the type of value that will be 
returned by {@link #lookup(Map)}
      */
     Class<?> getValueType();

Reply via email to