This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1ee6dba00a1ce8654b730245bba7e43395e99fac
Author: Eduardo Fontes <[email protected]>
AuthorDate: Wed Nov 20 16:33:27 2019 -0300

    NIFI-6886 - Bugfix
    
    Attribute peerPersistence can be null generating Bulletin WARNs "Unable to 
refresh Remote Group's peers due to null".
    
    Rollback
    
    The fix is inside site-to-site-reporting-task-bundle
    
    Modify getClient()
    
    Get ConfigurationContext and ReportingContext to provide a StateManager.
    
    Modify OnScheduled setup()
    
    The OnSchedule setup() now save the ConfigurationContext to lazily create a 
SiteToSiteClient with ReportingContext through an overloaded setup().
    
    Modify OnTrigger
    
    Lazily creates SiteToSiteClient to provide a StateManager
    
    Modify OnTrigger
    
    Lazily create SiteToSiteClient to provide a StateManager
    
    Modify OnTrigger
    
    Lazily create SiteToSiteClient to provide a StateManager
    
    Modify OnTrigger
    
    Lazily create SiteToSiteClient to provide a StateManager
    
    Retry compile
    
    Fix maven-checkstyle-plugin
    
    Fix maven-checkstyle-plugin
    
    Fix maven-checkstyle-plugin
    
    Fix maven-checkstyle-plugin
    
    Update AbstractSiteToSiteReportingTask.java
    
    Removed the OnSchedule setup(ConfigContext) because it is not needed.
    
    Update SiteToSiteUtils.java
    
    Removed ConfigContext from getClient parameters because ReportContext share 
the same properties.
---
 .../reporting/AbstractSiteToSiteReportingTask.java |  9 ++++-----
 .../reporting/SiteToSiteBulletinReportingTask.java | 13 +++++++-----
 .../reporting/SiteToSiteMetricsReportingTask.java  |  3 +++
 .../SiteToSiteProvenanceReportingTask.java         |  3 +++
 .../reporting/SiteToSiteStatusReportingTask.java   |  3 +++
 .../apache/nifi/reporting/s2s/SiteToSiteUtils.java | 23 +++++++++++-----------
 6 files changed, 33 insertions(+), 21 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 48f36a3..267a058 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -34,10 +34,8 @@ import javax.json.JsonArray;
 import javax.json.JsonObjectBuilder;
 import javax.json.JsonValue;
 
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.remote.Transaction;
@@ -118,9 +116,10 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
         return properties;
     }
 
-    @OnScheduled
-    public void setup(final ConfigurationContext context) throws IOException {
-        siteToSiteClient = SiteToSiteUtils.getClient(context, getLogger());
+    public void setup(final ReportingContext reportContext) throws IOException 
{
+        if (siteToSiteClient != null) {
+            siteToSiteClient = SiteToSiteUtils.getClient(reportContext, 
getLogger());
+        }
     }
 
     @OnStopped
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index 5f470c7..3e07759 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -138,11 +138,14 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
 
         // Send the JSON document for the current batch
         try {
-                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
-                if (transaction == null) {
-                    getLogger().info("All destination nodes are penalized; 
will attempt to send data later");
-                    return;
-                }
+            // Lazily create SiteToSiteClient to provide a StateManager
+            setup(context);
+
+            final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+            if (transaction == null) {
+                getLogger().info("All destination nodes are penalized; will 
attempt to send data later");
+                return;
+            }
 
             final Map<String, String> attributes = new HashMap<>();
             final String transactionId = UUID.randomUUID().toString();
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
index 9fcdaac..e781dfa 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
@@ -192,6 +192,9 @@ public class SiteToSiteMetricsReportingTask extends 
AbstractSiteToSiteReportingT
             }
 
             try {
+                // Lazily create SiteToSiteClient to provide a StateManager
+                setup(context);
+
                 long start = System.nanoTime();
                 final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index e17f59e..0ff507c 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -305,6 +305,9 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
             // Send the JSON document for the current batch
             try {
+                // Lazily create SiteToSiteClient to provide a StateManager
+                setup(context);
+
                 final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
                     // Throw an exception to avoid provenance event id will 
not proceed so that those can be consumed again.
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 0027da8..2466827 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -158,6 +158,9 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         while(!jsonBatch.isEmpty()) {
             // Send the JSON document for the current batch
             try {
+                // Lazily create SiteToSiteClient to provide a StateManager
+                setup(context);
+
                 long start = System.nanoTime();
                 final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
index cc977b5..feea3dd 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java
@@ -20,7 +20,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
@@ -29,6 +28,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
@@ -143,8 +143,8 @@ public class SiteToSiteUtils {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static SiteToSiteClient getClient(ConfigurationContext context, 
ComponentLog logger) {
-        final SSLContextService sslContextService = 
context.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class);
+    public static SiteToSiteClient getClient(ReportingContext reportContext, 
ComponentLog logger) {
+        final SSLContextService sslContextService = 
reportContext.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class);
         final SSLContext sslContext = sslContextService == null ? null : 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
         final EventReporter eventReporter = (EventReporter) (severity, 
category, message) -> {
             switch (severity) {
@@ -158,22 +158,23 @@ public class SiteToSiteUtils {
                     break;
             }
         };
-        final String destinationUrl = 
context.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
+        final String destinationUrl = 
reportContext.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue();
 
-        final SiteToSiteTransportProtocol mode = 
SiteToSiteTransportProtocol.valueOf(context.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
-        final HttpProxy httpProxy = 
mode.equals(SiteToSiteTransportProtocol.RAW) || 
StringUtils.isEmpty(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue())
 ? null
-                : new 
HttpProxy(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), 
context.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
-                
context.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), 
context.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
+        final SiteToSiteTransportProtocol mode = 
SiteToSiteTransportProtocol.valueOf(reportContext.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue());
+        final HttpProxy httpProxy = 
mode.equals(SiteToSiteTransportProtocol.RAW) || 
StringUtils.isEmpty(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue())
 ? null
+                : new 
HttpProxy(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(),
 reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(),
+                
reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), 
reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue());
 
         return new SiteToSiteClient.Builder()
                 .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
-                
.portName(context.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
-                
.useCompression(context.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
+                
.portName(reportContext.getProperty(SiteToSiteUtils.PORT_NAME).getValue())
+                
.useCompression(reportContext.getProperty(SiteToSiteUtils.COMPRESS).asBoolean())
                 .eventReporter(eventReporter)
                 .sslContext(sslContext)
-                
.timeout(context.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS),
 TimeUnit.MILLISECONDS)
+                
.timeout(reportContext.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS),
 TimeUnit.MILLISECONDS)
                 .transportProtocol(mode)
                 .httpProxy(httpProxy)
+                .stateManager(reportContext.getStateManager())
                 .build();
     }
 

Reply via email to