Repository: nifi
Updated Branches:
  refs/heads/master af2861f10 -> ea3c294f9


NIFI-819 - Extend GetHTTP to use dynamically add HTTP headers to a request

This closes #1462.

Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ea3c294f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ea3c294f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ea3c294f

Branch: refs/heads/master
Commit: ea3c294f94093d57a47982a7167126c718383a3c
Parents: af2861f
Author: Andre F de Miranda <trix...@users.noreply.github.com>
Authored: Thu Feb 2 10:23:23 2017 +1100
Committer: Aldrin Piri <ald...@apache.org>
Committed: Thu Feb 2 09:56:53 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/GetHTTP.java       | 41 ++++++++++++++++++++
 .../nifi/processors/standard/TestGetHTTP.java   | 37 +++++++++++++++++-
 2 files changed, 76 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ea3c294f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index 7d19566..9914ad7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -62,6 +62,8 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
 import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Stateful;
@@ -72,8 +74,10 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
@@ -102,6 +106,11 @@ import org.apache.nifi.util.Tuple;
     + "once the content has been fetched from the given URL, it will not be 
fetched again until the content on the remote server changes. Note that due to 
limitations on state "
     + "management, stored \"last modified\" and etag fields never expire. If 
the URL in GetHttp uses Expression Language that is unbounded, there "
     + "is the potential for Out of Memory Errors to occur.")
+@DynamicProperties({
+    @DynamicProperty(name = "Header Name", value = "The Expression Language to 
be used to populate the header value", description = "The additional headers to 
be sent by the processor " +
+            "whenever making a new HTTP request. \n " +
+            "Setting a dynamic property name to XYZ and value to ${attribute} 
will result in the header 'XYZ: attribute_value' being sent to the HTTP 
endpoint"),
+})
 @WritesAttributes({
     @WritesAttribute(attribute = "filename", description = "The filename is 
set to the name of the file on the remote server"),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type of 
the FlowFile, as reported by the HTTP Content-Type header")
@@ -235,6 +244,7 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
+    private volatile List<PropertyDescriptor> customHeaders = new 
ArrayList<>();
 
     private final AtomicBoolean clearState = new AtomicBoolean(false);
 
@@ -281,6 +291,14 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
         if (clearState.getAndSet(false)) {
             context.getStateManager().clear(Scope.LOCAL);
         }
+        if (customHeaders.size() == 0) {
+            for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {
+                // only add the custom defined Headers (i.e. dynamic 
properties)
+                if 
(!getSupportedPropertyDescriptors().contains(property.getKey())) {
+                    customHeaders.add(property.getKey());
+                }
+            }
+        }
     }
 
     @Override
@@ -306,6 +324,17 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
         return results;
     }
 
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .expressionLanguageSupported(true)
+                .addValidator(Validator.VALID)
+                .required(false)
+                .dynamic(true)
+                .build();
+    }
+
     private SSLContext createSSLContext(final SSLContextService service)
             throws KeyStoreException, IOException, NoSuchAlgorithmException, 
CertificateException, KeyManagementException, UnrecoverableKeyException {
 
@@ -467,6 +496,18 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
             if (accept != null) {
                 get.addHeader(HEADER_ACCEPT, accept);
             }
+
+            // Add dynamic headers
+
+            PropertyValue customHeaderValue;
+            for (PropertyDescriptor customProperty : customHeaders) {
+                customHeaderValue = 
context.getProperty(customProperty).evaluateAttributeExpressions();
+                if (StringUtils.isNotBlank(customHeaderValue.getValue())) {
+                    get.addHeader(customProperty.getName(), 
customHeaderValue.getValue());
+                }
+            }
+
+
             // create the http client
             try ( final CloseableHttpClient client = clientBuilder.build() ) {
                 // NOTE: including this inner try in order to swallow 
exceptions on close

http://git-wip-us.apache.org/repos/asf/nifi/blob/ea3c294f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
index d07baa5..8ec7a02 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import javax.servlet.http.HttpServletResponse;
-
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.reporting.InitializationException;
@@ -31,6 +29,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.servlet.http.HttpServletResponse;
 import java.net.URLEncoder;
 import java.util.HashMap;
 import java.util.Map;
@@ -246,6 +245,40 @@ public class TestGetHTTP {
     }
 
     @Test
+    public final void testDynamicHeaders() throws Exception {
+        // set up web service
+        ServletHandler handler = new ServletHandler();
+        handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
+
+        // create the service
+        TestServer server = new TestServer();
+        server.addHandler(handler);
+
+        try {
+            server.startServer();
+
+            String destination = server.getUrl();
+
+            // set up NiFi mock controller
+            controller = TestRunners.newTestRunner(GetHTTP.class);
+            controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
+            controller.setProperty(GetHTTP.URL, destination);
+            controller.setProperty(GetHTTP.FILENAME, "testFile");
+            controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, 
"application/json");
+            controller.setProperty(GetHTTP.USER_AGENT, "testUserAgent");
+            controller.setProperty("Static-Header", "StaticHeaderValue");
+            controller.setProperty("EL-Header", "${now()}");
+
+            controller.run();
+            controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
+
+            // shutdown web service
+        } finally {
+            server.shutdownServer();
+        }
+    }
+
+    @Test
     public final void testExpressionLanguage() throws Exception {
         // set up web service
         ServletHandler handler = new ServletHandler();

Reply via email to