Repository: nifi
Updated Branches:
  refs/heads/master b6eb0ac0f -> e05005584


NIFI-3809 - Added HTTP mode and HTTP proxy for S2S Reporting Tasks

This closes #1754.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e0500558
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e0500558
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e0500558

Branch: refs/heads/master
Commit: e05005584d52b560771a0ccf2766ee9ee92b6518
Parents: b6eb0ac
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Thu May 4 22:43:32 2017 +0200
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Thu May 25 15:31:49 2017 +0900

----------------------------------------------------------------------
 .../AbstractSiteToSiteReportingTask.java        | 59 ++++++++++++++++++++
 .../SiteToSiteBulletinReportingTask.java        | 16 ++----
 .../TestSiteToSiteBulletinReportingTask.java    |  6 ++
 3 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e0500558/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 37ed737..fa123a3 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -27,7 +27,10 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.StringUtils;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
@@ -44,6 +47,7 @@ public abstract class AbstractSiteToSiteReportingTask extends 
AbstractReportingT
 
     static final PropertyDescriptor DESTINATION_URL = new 
PropertyDescriptor.Builder()
             .name("Destination URL")
+            .displayName("Destination URL")
             .description("The URL of the destination NiFi instance to send 
data to, " +
                     "should be in the format http(s)://host:port/nifi.")
             .required(true)
@@ -52,6 +56,7 @@ public abstract class AbstractSiteToSiteReportingTask extends 
AbstractReportingT
             .build();
     static final PropertyDescriptor PORT_NAME = new 
PropertyDescriptor.Builder()
             .name("Input Port Name")
+            .displayName("Input Port Name")
             .description("The name of the Input Port to deliver data to.")
             .required(true)
             .expressionLanguageSupported(true)
@@ -59,12 +64,14 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
             .build();
     static final PropertyDescriptor SSL_CONTEXT = new 
PropertyDescriptor.Builder()
             .name("SSL Context Service")
+            .displayName("SSL Context Service")
             .description("The SSL Context Service to use when communicating 
with the destination. If not specified, communications will not be secure.")
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
     static final PropertyDescriptor INSTANCE_URL = new 
PropertyDescriptor.Builder()
             .name("Instance URL")
+            .displayName("Instance URL")
             .description("The URL of this instance to use in the Content URI 
of each event.")
             .required(true)
             .expressionLanguageSupported(true)
@@ -73,6 +80,7 @@ public abstract class AbstractSiteToSiteReportingTask extends 
AbstractReportingT
             .build();
     static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder()
             .name("Compress Events")
+            .displayName("Compress Events")
             .description("Indicates whether or not to compress the data being 
sent.")
             .required(true)
             .allowableValues("true", "false")
@@ -80,6 +88,7 @@ public abstract class AbstractSiteToSiteReportingTask extends 
AbstractReportingT
             .build();
     static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
             .name("Communications Timeout")
+            .displayName("Communications Timeout")
             .description("Specifies how long to wait to a response from the 
destination before deciding that an error has occurred and canceling the 
transaction")
             .required(true)
             .defaultValue("30 secs")
@@ -87,11 +96,49 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
             .build();
     static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("Batch Size")
+            .displayName("Batch Size")
             .description("Specifies how many records to send in a single 
batch, at most.")
             .required(true)
             .defaultValue("1000")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
+    static final PropertyDescriptor TRANSPORT_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("s2s-transport-protocol")
+            .displayName("Transport Protocol")
+            .description("Specifies which transport protocol to use for 
Site-to-Site communication.")
+            .required(true)
+            .allowableValues(SiteToSiteTransportProtocol.values())
+            .defaultValue(SiteToSiteTransportProtocol.RAW.name())
+            .build();
+    static final PropertyDescriptor HTTP_PROXY_HOSTNAME = new 
PropertyDescriptor.Builder()
+            .name("s2s-http-proxy-hostname")
+            .displayName("HTTP Proxy hostname")
+            .description("Specify the proxy server's hostname to use. If not 
specified, HTTP traffics are sent directly to the target NiFi instance.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+    static final PropertyDescriptor HTTP_PROXY_PORT = new 
PropertyDescriptor.Builder()
+            .name("s2s-http-proxy-port")
+            .displayName("HTTP Proxy port")
+            .description("Specify the proxy server's port number, optional. If 
not specified, default port 80 will be used.")
+            .required(false)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    static final PropertyDescriptor HTTP_PROXY_USERNAME = new 
PropertyDescriptor.Builder()
+            .name("s2s-http-proxy-username")
+            .displayName("HTTP Proxy username")
+            .description("Specify an user name to connect to the proxy server, 
optional.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+    static final PropertyDescriptor HTTP_PROXY_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("s2s-http-proxy-password")
+            .displayName("HTTP Proxy password")
+            .description("Specify an user password to connect to the proxy 
server, optional.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
 
     protected volatile SiteToSiteClient siteToSiteClient;
 
@@ -105,6 +152,11 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
         properties.add(COMPRESS);
         properties.add(TIMEOUT);
         properties.add(BATCH_SIZE);
+        properties.add(TRANSPORT_PROTOCOL);
+        properties.add(HTTP_PROXY_HOSTNAME);
+        properties.add(HTTP_PROXY_PORT);
+        properties.add(HTTP_PROXY_USERNAME);
+        properties.add(HTTP_PROXY_PASSWORD);
         return properties;
     }
 
@@ -131,6 +183,11 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
 
         final String destinationUrl = 
context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();
 
+        final SiteToSiteTransportProtocol mode = 
SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
+        final HttpProxy httpProxy = 
mode.equals(SiteToSiteTransportProtocol.RAW) || 
StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
+                : new 
HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), 
context.getProperty(HTTP_PROXY_PORT).asInteger(),
+                context.getProperty(HTTP_PROXY_USERNAME).getValue(), 
context.getProperty(HTTP_PROXY_PASSWORD).getValue());
+
         siteToSiteClient = new SiteToSiteClient.Builder()
                 .url(destinationUrl)
                 .portName(context.getProperty(PORT_NAME).getValue())
@@ -138,6 +195,8 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
                 .eventReporter(eventReporter)
                 .sslContext(sslContext)
                 
.timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS)
+                .transportProtocol(mode)
+                .httpProxy(httpProxy)
                 .build();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0500558/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index 566b780..b829c1e 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -74,20 +74,12 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
 
     private volatile long lastSentBulletinId = -1L;
 
-    static List<PropertyDescriptor> descriptors = new ArrayList<>();
-
-    static {
-        descriptors.add(DESTINATION_URL);
-        descriptors.add(PORT_NAME);
-        descriptors.add(SSL_CONTEXT);
-        descriptors.add(COMPRESS);
-        descriptors.add(TIMEOUT);
-        descriptors.add(PLATFORM);
-    }
-
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(PLATFORM);
+        properties.remove(BATCH_SIZE);
+        return properties;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0500558/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
index d5bce1b..d247a4e 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
@@ -39,6 +39,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockPropertyValue;
 import org.junit.Assert;
@@ -126,6 +127,11 @@ public class TestSiteToSiteBulletinReportingTask {
         }
         properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
         properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
+        properties.put(SiteToSiteBulletinReportingTask.TRANSPORT_PROTOCOL, 
SiteToSiteTransportProtocol.HTTP.name());
+        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_HOSTNAME, 
"localhost");
+        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PORT, "80");
+        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_USERNAME, 
"username");
+        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PASSWORD, 
"password");
 
         Mockito.doAnswer(new Answer<PropertyValue>() {
             @Override

Reply via email to