This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1ee6dba00a1ce8654b730245bba7e43395e99fac Author: Eduardo Fontes <[email protected]> AuthorDate: Wed Nov 20 16:33:27 2019 -0300 NIFI-6886 - Bugfix Attribute peerPersistence can be null generating Bulletin WARNs "Unable to refresh Remote Group's peers due to null". Rollback The fix is inside site-to-site-reporting-task-bundle Modify getClient() Get ConfigurationContext and ReportingContext to provide a StateManager. Modify OnScheduled setup() The OnSchedule setup() now save the ConfigurationContext to lazily create a SiteToSiteClient with ReportingContext through an overloaded setup(). Modify OnTrigger Lazily creates SiteToSiteClient to provide a StateManager Modify OnTrigger Lazily create SiteToSiteClient to provide a StateManager Modify OnTrigger Lazily create SiteToSiteClient to provide a StateManager Modify OnTrigger Lazily create SiteToSiteClient to provide a StateManager Retry compile Fix maven-checkstyle-plugin Fix maven-checkstyle-plugin Fix maven-checkstyle-plugin Fix maven-checkstyle-plugin Update AbstractSiteToSiteReportingTask.java Removed the OnSchedule setup(ConfigContext) because it is not needed. Update SiteToSiteUtils.java Removed ConfigContext from getClient parameters because ReportContext share the same properties. --- .../reporting/AbstractSiteToSiteReportingTask.java | 9 ++++----- .../reporting/SiteToSiteBulletinReportingTask.java | 13 +++++++----- .../reporting/SiteToSiteMetricsReportingTask.java | 3 +++ .../SiteToSiteProvenanceReportingTask.java | 3 +++ .../reporting/SiteToSiteStatusReportingTask.java | 3 +++ .../apache/nifi/reporting/s2s/SiteToSiteUtils.java | 23 +++++++++++----------- 6 files changed, 33 insertions(+), 21 deletions(-) diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index 48f36a3..267a058 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -34,10 +34,8 @@ import javax.json.JsonArray; import javax.json.JsonObjectBuilder; import javax.json.JsonValue; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.remote.Transaction; @@ -118,9 +116,10 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT return properties; } - @OnScheduled - public void setup(final ConfigurationContext context) throws IOException { - siteToSiteClient = SiteToSiteUtils.getClient(context, getLogger()); + public void setup(final ReportingContext reportContext) throws IOException { + if (siteToSiteClient != null) { + siteToSiteClient = SiteToSiteUtils.getClient(reportContext, getLogger()); + } } @OnStopped diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index 5f470c7..3e07759 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -138,11 +138,14 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting // Send the JSON document for the current batch try { - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); - if (transaction == null) { - getLogger().info("All destination nodes are penalized; will attempt to send data later"); - return; - } + // Lazily create SiteToSiteClient to provide a StateManager + setup(context); + + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().info("All destination nodes are penalized; will attempt to send data later"); + return; + } final Map<String, String> attributes = new HashMap<>(); final String transactionId = UUID.randomUUID().toString(); diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index 9fcdaac..e781dfa 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -192,6 +192,9 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT } try { + // Lazily create SiteToSiteClient to provide a StateManager + setup(context); + long start = System.nanoTime(); final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index e17f59e..0ff507c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -305,6 +305,9 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti // Send the JSON document for the current batch try { + // Lazily create SiteToSiteClient to provide a StateManager + setup(context); + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { // Throw an exception to avoid provenance event id will not proceed so that those can be consumed again. diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index 0027da8..2466827 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -158,6 +158,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa while(!jsonBatch.isEmpty()) { // Send the JSON document for the current batch try { + // Lazily create SiteToSiteClient to provide a StateManager + setup(context); + long start = System.nanoTime(); final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); if (transaction == null) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java index cc977b5..feea3dd 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/s2s/SiteToSiteUtils.java @@ -20,7 +20,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.events.EventReporter; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; @@ -29,6 +28,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StringUtils; @@ -143,8 +143,8 @@ public class SiteToSiteUtils { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - public static SiteToSiteClient getClient(ConfigurationContext context, ComponentLog logger) { - final SSLContextService sslContextService = context.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class); + public static SiteToSiteClient getClient(ReportingContext reportContext, ComponentLog logger) { + final SSLContextService sslContextService = reportContext.getProperty(SiteToSiteUtils.SSL_CONTEXT).asControllerService(SSLContextService.class); final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); final EventReporter eventReporter = (EventReporter) (severity, category, message) -> { switch (severity) { @@ -158,22 +158,23 @@ public class SiteToSiteUtils { break; } }; - final String destinationUrl = context.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue(); + final String destinationUrl = reportContext.getProperty(SiteToSiteUtils.DESTINATION_URL).evaluateAttributeExpressions().getValue(); - final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue()); - final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null - : new HttpProxy(context.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(), - context.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), context.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue()); + final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(reportContext.getProperty(SiteToSiteUtils.TRANSPORT_PROTOCOL).getValue()); + final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue()) ? null + : new HttpProxy(reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_HOSTNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PORT).asInteger(), + reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_USERNAME).getValue(), reportContext.getProperty(SiteToSiteUtils.HTTP_PROXY_PASSWORD).getValue()); return new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl)) - .portName(context.getProperty(SiteToSiteUtils.PORT_NAME).getValue()) - .useCompression(context.getProperty(SiteToSiteUtils.COMPRESS).asBoolean()) + .portName(reportContext.getProperty(SiteToSiteUtils.PORT_NAME).getValue()) + .useCompression(reportContext.getProperty(SiteToSiteUtils.COMPRESS).asBoolean()) .eventReporter(eventReporter) .sslContext(sslContext) - .timeout(context.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + .timeout(reportContext.getProperty(SiteToSiteUtils.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .transportProtocol(mode) .httpProxy(httpProxy) + .stateManager(reportContext.getStateManager()) .build(); }
