NIFI-950: Make component validation asynchronous
NIFI-950: Still seeing some slow response times when instantiating a large 
template in cluster mode so making some minor tweaks based on the results of 
CPU profiling
NIFI-5112: Refactored FlowSerializer so that it creates the desired 
intermediate data model that can be serialized, separate from serializing. This 
allows us to hold the FlowController's Read Lock only while creating the data 
model, not while actually serializing the data. Configured Jersey Client in 
ThreadPoolRequestReplicator not to look for features using the Service Loader 
for every request. Updated Template object to hold a DOM Node that represents 
the template contents instead of having to serialize the DTO, then parse the 
serialized form as a DOM object each time that it needs to be serialized.
NIFI-5112: Change ThreadPoolRequestReplicator to use OkHttp client instead of 
Jersey Client
NIFI-5111: Ensure that if a node is no longer cluster coordinator, that it 
clears any stale heartbeats.
NIFI-5110: Notify StandardProcessScheduler when a component is removed so that 
it will clean up any resource related to component lifecycle.
NIFI-950: Avoid gathering the Status objects for entire flow when we don't need 
them; removed unnecessary code
NIFI-950: Bug fixes
NIFI-950: Bug fix; added validation status to ProcessorDTO, 
ControllerServiceDTO, ReportingTaskDTO; updated DebugFlow to allow for pause 
time to be set in the customValidate method for testing functionality
NIFI-950: Addressing test failures
NIFI-950: Bug fixes
NIFI-950: Addressing review feedback
NIFI-950: Fixed validation logic in mock framework
This closes #2693


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/604656fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/604656fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/604656fe

Branch: refs/heads/master
Commit: 604656fe8829a79549c24d34c0149891e0edebe9
Parents: ea41b41
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Apr 11 15:36:54 2018 -0400
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Wed May 16 14:39:23 2018 -0400

----------------------------------------------------------------------
 .../nifi/components/ConfigurableComponent.java  |   6 +-
 .../nifi/components/PropertyDescriptor.java     |  92 +--
 .../nifi/components/ValidationResult.java       |   2 +-
 .../org/apache/nifi/components/Validator.java   |   2 +-
 .../nifi/web/util/NiFiHostnameVerifier.java     |  61 ++
 .../java/org/apache/nifi/web/util/WebUtils.java |  33 +-
 .../apache/nifi/util/MockProcessContext.java    |  69 +-
 .../nifi/schema/access/SchemaAccessUtils.java   |   4 +-
 .../AccessPolicyProviderInvocationHandler.java  |  12 +-
 .../AuthorizationAuditorInvocationHandler.java  |  11 +-
 .../AuthorizerInvocationHandler.java            |  11 +-
 .../nifi/web/api/dto/ControllerServiceDTO.java  |  15 +
 .../apache/nifi/web/api/dto/FlowSnippetDTO.java |   2 +-
 .../apache/nifi/web/api/dto/ProcessorDTO.java   |  15 +
 .../nifi/web/api/dto/ReportingTaskDTO.java      |  15 +
 .../authorization/resource/ResourceFactory.java |  10 +-
 .../nifi-framework-cluster/pom.xml              |  71 +-
 .../heartbeat/AbstractHeartbeatMonitor.java     |   5 +-
 .../http/replication/HttpReplicationClient.java |  31 +
 .../http/replication/PreparedRequest.java       |  28 +
 .../ThreadPoolRequestReplicator.java            | 230 ++----
 .../replication/okhttp/EntitySerializer.java    |  25 +
 .../replication/okhttp/JacksonResponse.java     | 237 ++++++
 .../okhttp/JsonEntitySerializer.java            |  42 ++
 .../okhttp/OkHttpPreparedRequest.java           |  62 ++
 .../okhttp/OkHttpReplicationClient.java         | 366 ++++++++++
 .../replication/okhttp/XmlEntitySerializer.java |  60 ++
 .../manager/ControllerServiceEntityMerger.java  |  17 +-
 .../nifi/cluster/manager/ErrorMerger.java       |  31 +-
 .../cluster/manager/ProcessorEntityMerger.java  |   7 +
 .../manager/ReportingTaskEntityMerger.java      |   7 +
 .../ThreadPoolRequestReplicatorFactoryBean.java |  15 +-
 .../TestThreadPoolRequestReplicator.java        | 118 +--
 .../okhttp/TestJsonEntitySerializer.java        |  97 +++
 .../replication/util/MockReplicationClient.java | 217 ++++++
 .../ControllerServiceEntityMergerSpec.groovy    |   4 +-
 .../DisabledServiceValidationResult.java        |  42 ++
 .../components/validation/ValidationState.java  |  40 +
 .../components/validation/ValidationStatus.java |  35 +
 .../validation/ValidationTrigger.java           |  36 +
 .../nifi/controller/AbstractComponentNode.java  | 728 +++++++++++++++++++
 .../controller/AbstractConfiguredComponent.java | 607 ----------------
 .../apache/nifi/controller/ComponentNode.java   | 201 +++++
 .../nifi/controller/ConfiguredComponent.java    | 154 ----
 .../nifi/controller/ProcessScheduler.java       |  28 +-
 .../apache/nifi/controller/ProcessorNode.java   |  10 +-
 .../nifi/controller/ReportingTaskNode.java      |   2 +-
 .../org/apache/nifi/controller/Template.java    |  21 +
 .../ControllerServiceDisabledException.java     |  31 +
 .../service/ControllerServiceNode.java          |  19 +-
 .../service/ControllerServiceProvider.java      |  10 +-
 .../service/ControllerServiceReference.java     |   6 +-
 .../org/apache/nifi/groups/ProcessGroup.java    |   9 +-
 .../controller/TestAbstractComponentNode.java   | 126 ++++
 .../validation/StandardValidationTrigger.java   |  59 ++
 .../validation/TriggerValidationTask.java       |  56 ++
 .../apache/nifi/controller/FlowController.java  | 272 ++++---
 .../nifi/controller/StandardProcessorNode.java  | 409 ++++++-----
 .../apache/nifi/controller/TemplateUtils.java   |   2 +
 .../reporting/AbstractReportingTaskNode.java    |  43 +-
 .../reporting/StandardReportingTaskNode.java    |   9 +-
 .../scheduling/StandardProcessScheduler.java    |  26 +-
 .../serialization/FlowSerializer.java           |  16 +-
 .../serialization/StandardFlowSerializer.java   |  87 ++-
 .../service/ControllerServiceLoader.java        |   9 +
 .../service/ServiceStateTransition.java         |  20 +
 .../service/StandardConfigurationContext.java   |   6 +-
 ...ndardControllerServiceInvocationHandler.java |  11 +-
 .../service/StandardControllerServiceNode.java  |  89 ++-
 .../StandardControllerServiceProvider.java      |  29 +-
 .../StandardControllerServiceReference.java     |  24 +-
 .../nifi/groups/StandardProcessGroup.java       |  78 +-
 .../nifi/persistence/TemplateDeserializer.java  |  17 +-
 .../nifi/persistence/TemplateSerializer.java    |  13 +-
 .../processor/StandardSchedulingContext.java    |   8 +-
 .../processor/StandardValidationContext.java    |  16 +-
 .../flow/mapping/NiFiRegistryFlowMapper.java    |   6 +-
 .../apache/nifi/util/ClassAnnotationPair.java   |  61 ++
 .../org/apache/nifi/util/ReflectionUtils.java   |  79 +-
 .../controller/StandardFlowServiceTest.java     |  22 +-
 .../nifi/controller/TestFlowController.java     |  17 +-
 .../controller/TestStandardProcessorNode.java   |  37 +-
 .../scheduling/StandardProcessSchedulerIT.java  |   6 +-
 .../scheduling/TestProcessorLifecycle.java      | 139 +---
 .../TestStandardProcessScheduler.java           |  43 +-
 .../StandardFlowSerializerTest.java             |   4 +-
 .../StandardControllerServiceProviderIT.java    |   4 +-
 .../StandardControllerServiceProviderTest.java  |   3 +-
 .../TestStandardControllerServiceProvider.java  |  44 +-
 .../service/mock/MockProcessGroup.java          |   4 +-
 .../nifi/util/SynchronousValidationTrigger.java |  35 +
 .../src/test/resources/logback-test.xml         |   2 +-
 .../org/apache/nifi/nar/ExtensionManager.java   |   2 +-
 .../nifi/audit/ControllerServiceAuditor.java    |   6 +-
 .../StandardAuthorizableLookup.java             |  15 +-
 .../nifi/web/StandardNiFiServiceFacade.java     |  69 +-
 .../org/apache/nifi/web/api/FlowResource.java   |  15 +-
 .../nifi/web/api/ProcessGroupResource.java      |   2 +-
 .../apache/nifi/web/api/SnippetResource.java    |  39 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  19 +-
 .../nifi/web/controller/ControllerFacade.java   |  23 +-
 .../web/controller/ControllerSearchService.java |   5 +-
 .../nifi/web/dao/ControllerServiceDAO.java      |   4 +-
 .../dao/impl/StandardControllerServiceDAO.java  |   6 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java |   2 +-
 .../nifi/processors/standard/DebugFlow.java     |  30 +-
 .../apache/nifi/ssl/SSLContextServiceTest.java  |  20 +-
 107 files changed, 4205 insertions(+), 1992 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java 
b/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java
index cd8c781..2f693da 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java
@@ -26,13 +26,13 @@ public interface ConfigurableComponent {
     /**
      * Validates a set of properties, returning ValidationResults for any
      * invalid properties. All defined properties will be validated. If they 
are
-     * not included in the in the purposed configuration, the default value 
will
+     * not included in the purposed configuration, the default value will
      * be used.
      *
      * @param context of validation
      * @return Collection of validation result objects for any invalid findings
-     * only. If the collection is empty then the component is valid. Guaranteed
-     * non-null
+     *         only. If the collection is empty then the component is valid. 
Guaranteed
+     *         non-null
      */
     Collection<ValidationResult> validate(ValidationContext context);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java 
b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
index 3ac7510..3e767ba 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
@@ -140,9 +140,11 @@ public final class PropertyDescriptor implements 
Comparable<PropertyDescriptor>
      */
     public ValidationResult validate(final String input, final 
ValidationContext context) {
         ValidationResult lastResult = Validator.INVALID.validate(this.name, 
input, context);
+
         if (allowableValues != null && !allowableValues.isEmpty()) {
             final ConstrainedSetValidator csValidator = new 
ConstrainedSetValidator(allowableValues);
             final ValidationResult csResult = csValidator.validate(this.name, 
input, context);
+
             if (csResult.isValid()) {
                 lastResult = csResult;
             } else {
@@ -150,90 +152,32 @@ public final class PropertyDescriptor implements 
Comparable<PropertyDescriptor>
             }
         }
 
-        // if the property descriptor identifies a Controller Service, 
validate that the ControllerService exists, is of the correct type, and is valid
-        if (controllerServiceDefinition != null) {
-            final Set<String> validIdentifiers = 
context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition);
-            if (validIdentifiers != null && validIdentifiers.contains(input)) {
-                final ControllerService controllerService = 
context.getControllerServiceLookup().getControllerService(input);
-                if (!context.isValidationRequired(controllerService)) {
-                    return new ValidationResult.Builder()
-                            .input(input)
-                            .subject(getName())
-                            .valid(true)
-                            .build();
-                }
-
-                final String serviceId = controllerService.getIdentifier();
-                if (!isDependentServiceEnableable(context, serviceId)) {
-                    return new ValidationResult.Builder()
-                            
.input(context.getControllerServiceLookup().getControllerServiceName(serviceId))
-                            .subject(getName())
-                            .valid(false)
-                            .explanation("Controller Service " + 
controllerService + " is disabled")
-                            .build();
-                }
-
-                final Collection<ValidationResult> validationResults = 
controllerService.validate(context.getControllerServiceValidationContext(controllerService));
-                final List<ValidationResult> invalidResults = new 
ArrayList<>();
-                for (final ValidationResult result : validationResults) {
-                    if (!result.isValid()) {
-                        invalidResults.add(result);
-                    }
-                }
-                if (!invalidResults.isEmpty()) {
-                    return new ValidationResult.Builder()
-                        .input(input)
-                        .subject(getName())
-                        .valid(false)
-                        .explanation("Controller Service is not valid: " + 
(invalidResults.size() > 1 ? invalidResults : invalidResults.get(0)))
-                        .build();
-                }
+        for (final Validator validator : validators) {
+            lastResult = validator.validate(this.name, input, context);
+            if (!lastResult.isValid()) {
+                break;
+            }
+        }
 
+        if (getControllerServiceDefinition() != null) {
+            final ControllerService service = 
context.getControllerServiceLookup().getControllerService(input);
+            if (service == null) {
                 return new ValidationResult.Builder()
-                        .input(input)
-                        .subject(getName())
-                        .valid(true)
-                        .build();
+                    .input(input)
+                    .subject(getDisplayName())
+                    .valid(false)
+                    .explanation("Property references a Controller Service 
that does not exist")
+                    .build();
             } else {
                 return new ValidationResult.Builder()
-                        .input(input)
-                        .subject(getName())
-                        .valid(false)
-                        .explanation("Invalid Controller Service: " + input + 
" is not a valid Controller Service Identifier or does not reference the 
correct type of Controller Service")
-                        .build();
+                    .valid(true)
+                    .build();
             }
         }
 
-        for (final Validator validator : validators) {
-            lastResult = validator.validate(this.name, input, context);
-            if (!lastResult.isValid()) {
-                break;
-            }
-        }
         return lastResult;
     }
 
-    /**
-     * Will validate if the dependent service (service identified with the
-     * 'serviceId') is 'enableable' which means that the dependent service is
-     * either in ENABLING or ENABLED state. The important issue here is to
-     * understand the order in which states are assigned:
-     *
-     * - Upon the initialization of the service its state is set to ENABLING.
-     *
-     * - Transition to ENABLED will happen asynchronously.
-     *
-     * So we check first for ENABLING state and if it succeeds we skip the 
check
-     * for ENABLED state even though by the time this method returns the
-     * dependent service's state could be fully ENABLED.
-     */
-    private boolean isDependentServiceEnableable(final ValidationContext 
context, final String serviceId) {
-        boolean enableable = 
context.getControllerServiceLookup().isControllerServiceEnabling(serviceId);
-        if (!enableable) {
-            enableable = 
context.getControllerServiceLookup().isControllerServiceEnabled(serviceId);
-        }
-        return enableable;
-    }
 
     public static final class Builder {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java 
b/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java
index e0beec8..969c2d7 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java
@@ -30,7 +30,7 @@ public class ValidationResult {
     private final String explanation;
     private final boolean valid;
 
-    private ValidationResult(final Builder builder) {
+    protected ValidationResult(final Builder builder) {
         this.subject = builder.subject;
         this.input = builder.input;
         this.explanation = builder.explanation;

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/Validator.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/Validator.java 
b/nifi-api/src/main/java/org/apache/nifi/components/Validator.java
index a12b532..3befdf8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/Validator.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/Validator.java
@@ -28,7 +28,7 @@ public interface Validator {
     Validator INVALID = new Validator() {
         @Override
         public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
-            return new 
ValidationResult.Builder().subject(subject).explanation(String.format("'%s' is 
not a supported property", subject)).input(input).build();
+            return new 
ValidationResult.Builder().subject(subject).explanation(String.format("'%s' is 
not a supported property or has no Validator associated with it", 
subject)).input(input).build();
         }
     };
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java
 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java
new file mode 100644
index 0000000..960af58
--- /dev/null
+++ 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.util;
+
+import java.security.cert.Certificate;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.nifi.security.util.CertificateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NiFiHostnameVerifier implements HostnameVerifier {
+    private static final Logger logger = 
LoggerFactory.getLogger(NiFiHostnameVerifier.class);
+
+    @Override
+    public boolean verify(final String hostname, final SSLSession ssls) {
+        try {
+            for (final Certificate peerCertificate : 
ssls.getPeerCertificates()) {
+                if (peerCertificate instanceof X509Certificate) {
+                    final X509Certificate x509Cert = (X509Certificate) 
peerCertificate;
+                    final String dn = x509Cert.getSubjectDN().getName();
+                    final String commonName = 
CertificateUtils.extractUsername(dn);
+                    if (commonName.equals(hostname)) {
+                        return true;
+                    }
+
+                    final List<String> subjectAltNames = 
CertificateUtils.getSubjectAlternativeNames(x509Cert);
+                    if (subjectAltNames.contains(hostname.toLowerCase())) {
+                        return true;
+                    }
+                }
+            }
+        } catch (final SSLPeerUnverifiedException | 
CertificateParsingException ex) {
+            logger.warn("Hostname Verification encountered exception verifying 
hostname due to: " + ex, ex);
+        }
+
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
index 351d7f9..21a41fd 100644
--- 
a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
+++ 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
@@ -17,23 +17,18 @@
 package org.apache.nifi.web.util;
 
 import java.net.URI;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateParsingException;
-import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.net.ssl.HostnameVerifier;
+
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.UriBuilderException;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.security.util.CertificateUtils;
 import org.glassfish.jersey.client.ClientConfig;
 import 
org.glassfish.jersey.jackson.internal.jackson.jaxrs.json.JacksonJaxbJsonProvider;
 import org.slf4j.Logger;
@@ -100,29 +95,7 @@ public final class WebUtils {
         if (ctx != null) {
 
             // custom hostname verifier that checks subject alternative names 
against the hostname of the URI
-            final HostnameVerifier hostnameVerifier = new HostnameVerifier() {
-                @Override
-                public boolean verify(final String hostname, final SSLSession 
ssls) {
-
-                    try {
-                        for (final Certificate peerCertificate : 
ssls.getPeerCertificates()) {
-                            if (peerCertificate instanceof X509Certificate) {
-                                final X509Certificate x509Cert = 
(X509Certificate) peerCertificate;
-                                final List<String> subjectAltNames = 
CertificateUtils.getSubjectAlternativeNames(x509Cert);
-                                if 
(subjectAltNames.contains(hostname.toLowerCase())) {
-                                    return true;
-                                }
-                            }
-                        }
-                    } catch (final SSLPeerUnverifiedException | 
CertificateParsingException ex) {
-                        logger.warn("Hostname Verification encountered 
exception verifying hostname due to: " + ex, ex);
-                    }
-
-                    return false;
-                }
-            };
-
-            clientBuilder = 
clientBuilder.sslContext(ctx).hostnameVerifier(hostnameVerifier);
+            clientBuilder = clientBuilder.sslContext(ctx).hostnameVerifier(new 
NiFiHostnameVerifier());
         }
 
         clientBuilder = 
clientBuilder.register(ObjectMapperResolver.class).register(JacksonJaxbJsonProvider.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 3448524..a16216d 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -18,6 +18,7 @@ package org.apache.nifi.util;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,6 +35,7 @@ import 
org.apache.nifi.attribute.expression.language.Query.Range;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
@@ -240,7 +242,72 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
      * non-null
      */
     public Collection<ValidationResult> validate() {
-        return component.validate(new MockValidationContext(this, 
stateManager, variableRegistry));
+        final List<ValidationResult> results = new ArrayList<>();
+        final ValidationContext validationContext = new 
MockValidationContext(this, stateManager, variableRegistry);
+        final Collection<ValidationResult> componentResults = 
component.validate(validationContext);
+        results.addAll(componentResults);
+
+        final Collection<ValidationResult> serviceResults = 
validateReferencedControllerServices(validationContext);
+        results.addAll(serviceResults);
+        return results;
+    }
+
+    protected final Collection<ValidationResult> 
validateReferencedControllerServices(final ValidationContext validationContext) 
{
+        final List<PropertyDescriptor> supportedDescriptors = 
component.getPropertyDescriptors();
+        if (supportedDescriptors == null) {
+            return Collections.emptyList();
+        }
+
+        final Collection<ValidationResult> validationResults = new 
ArrayList<>();
+        for (final PropertyDescriptor descriptor : supportedDescriptors) {
+            if (descriptor.getControllerServiceDefinition() == null) {
+                // skip properties that aren't for a controller service
+                continue;
+            }
+
+            final String controllerServiceId = 
validationContext.getProperty(descriptor).getValue();
+            if (controllerServiceId == null) {
+                continue;
+            }
+
+            final ControllerService controllerService = 
getControllerService(controllerServiceId);
+            if (controllerService == null) {
+                final ValidationResult result = new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(descriptor.getDisplayName())
+                    .input(controllerServiceId)
+                    .explanation("Invalid Controller Service: " + 
controllerServiceId + " is not a valid Controller Service Identifier")
+                    .build();
+
+                validationResults.add(result);
+                continue;
+            }
+
+            final Class<? extends ControllerService> requiredServiceClass = 
descriptor.getControllerServiceDefinition();
+            if 
(!requiredServiceClass.isAssignableFrom(controllerService.getClass())) {
+                final ValidationResult result = new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(descriptor.getDisplayName())
+                    .input(controllerServiceId)
+                    .explanation("Invalid Controller Service: " + 
controllerServiceId + " does not implement interface " + requiredServiceClass)
+                    .build();
+
+                validationResults.add(result);
+                continue;
+            }
+
+            final boolean enabled = 
isControllerServiceEnabled(controllerServiceId);
+            if (!enabled) {
+                validationResults.add(new ValidationResult.Builder()
+                    .input(controllerServiceId)
+                    .subject(descriptor.getDisplayName())
+                    .explanation("Controller Service with ID " + 
controllerServiceId + " is not enabled")
+                    .valid(false)
+                    .build());
+            }
+        }
+
+        return validationResults;
     }
 
     public boolean isValid() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
index 111b02a..82ea240 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -85,7 +85,7 @@ public class SchemaAccessUtils {
             .description("Specifies the name of the branch to use when looking 
up the schema in the Schema Registry property. " +
                     "If the chosen Schema Registry does not support branching, 
this value will be ignored.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
             .build();
 
@@ -95,7 +95,7 @@ public class SchemaAccessUtils {
             .description("Specifies the version of the schema to lookup in the 
Schema Registry. " +
                     "If not specified then the latest version of the schema 
will be retrieved.")
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
             .build();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java
index e41afa3..63bd2ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java
@@ -23,10 +23,18 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
 public class AccessPolicyProviderInvocationHandler implements 
InvocationHandler {
-
+    private static final Method getUserGroupProviderMethod;
     private final AccessPolicyProvider accessPolicyProvider;
     private final ClassLoader classLoader;
 
+    static {
+        try {
+            getUserGroupProviderMethod = 
AccessPolicyProvider.class.getMethod("getUserGroupProvider");
+        } catch (final Exception e) {
+            throw new RuntimeException("Unable to obtain necessary class 
information for AccessPolicyProvider", e);
+        }
+    }
+
     public AccessPolicyProviderInvocationHandler(final AccessPolicyProvider 
accessPolicyProvider, final ClassLoader classLoader) {
         this.accessPolicyProvider = accessPolicyProvider;
         this.classLoader = classLoader;
@@ -35,7 +43,7 @@ public class AccessPolicyProviderInvocationHandler implements 
InvocationHandler
     @Override
     public Object invoke(final Object proxy, final Method method, final 
Object[] args) throws Throwable {
         try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(classLoader)) {
-            if 
(AccessPolicyProvider.class.getMethod("getUserGroupProvider").equals(method)) {
+            if (getUserGroupProviderMethod.equals(method)) {
                 final UserGroupProvider userGroupProvider = 
(UserGroupProvider) method.invoke(accessPolicyProvider, args);
                 if (userGroupProvider == null) {
                     return userGroupProvider;

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java
index 7f8d76c..1dc81d3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java
@@ -24,6 +24,15 @@ public class AuthorizationAuditorInvocationHandler 
implements InvocationHandler
 
     private final Authorizer authorizer;
     private final AuthorizationAuditor auditor;
+    private static final Method auditAccessAttemptMethod;
+
+    static {
+        try {
+            auditAccessAttemptMethod = 
AuthorizationAuditor.class.getMethod("auditAccessAttempt", 
AuthorizationRequest.class, AuthorizationResult.class);
+        } catch (final Exception e) {
+            throw new RuntimeException("Unable to obtain necessary class 
information for AccessPolicyProvider", e);
+        }
+    }
 
     public AuthorizationAuditorInvocationHandler(final Authorizer authorizer, 
final AuthorizationAuditor auditor) {
         this.authorizer = authorizer;
@@ -33,7 +42,7 @@ public class AuthorizationAuditorInvocationHandler implements 
InvocationHandler
     @Override
     public Object invoke(final Object proxy, final Method method, final 
Object[] args) throws Throwable {
         try {
-            if (AuthorizationAuditor.class.getMethod("auditAccessAttempt", 
AuthorizationRequest.class, AuthorizationResult.class).equals(method)) {
+            if (auditAccessAttemptMethod.equals(method)) {
                 return method.invoke(auditor, args);
             } else {
                 return method.invoke(authorizer, args);

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java
index 228fe09..c8b6771 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java
@@ -24,9 +24,18 @@ import java.lang.reflect.Method;
 
 public class AuthorizerInvocationHandler implements InvocationHandler {
 
+    private static final Method getAccessPolicyProviderMethod;
     private final Authorizer authorizer;
     private final ClassLoader classLoader;
 
+    static {
+        try {
+            getAccessPolicyProviderMethod = 
ManagedAuthorizer.class.getMethod("getAccessPolicyProvider");
+        } catch (final Exception e) {
+            throw new RuntimeException("Unable to obtain necessary class 
information for AccessPolicyProvider", e);
+        }
+    }
+
     public AuthorizerInvocationHandler(final Authorizer authorizer, final 
ClassLoader classLoader) {
         this.authorizer = authorizer;
         this.classLoader = classLoader;
@@ -35,7 +44,7 @@ public class AuthorizerInvocationHandler implements 
InvocationHandler {
     @Override
     public Object invoke(final Object proxy, final Method method, final 
Object[] args) throws Throwable {
         try (final NarCloseable narCloseable = 
NarCloseable.withComponentNarLoader(classLoader)) {
-            if 
(ManagedAuthorizer.class.getMethod("getAccessPolicyProvider").equals(method)) {
+            if (getAccessPolicyProviderMethod.equals(method)) {
                 final AccessPolicyProvider accessPolicyProvider = 
(AccessPolicyProvider) method.invoke(authorizer, args);
                 if (accessPolicyProvider == null) {
                     return accessPolicyProvider;

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java
index 571598c..dbea636 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java
@@ -30,6 +30,9 @@ import java.util.Set;
  */
 @XmlType(name = "controllerService")
 public class ControllerServiceDTO extends ComponentDTO {
+    public static final String VALID = "VALID";
+    public static final String INVALID = "INVALID";
+    public static final String VALIDATING = "VALIDATING";
 
     private String name;
     private String type;
@@ -52,6 +55,7 @@ public class ControllerServiceDTO extends ComponentDTO {
     private Set<ControllerServiceReferencingComponentEntity> 
referencingComponents;
 
     private Collection<String> validationErrors;
+    private String validationStatus;
 
     /**
      * @return controller service name
@@ -298,6 +302,17 @@ public class ControllerServiceDTO extends ComponentDTO {
         this.validationErrors = validationErrors;
     }
 
+    @ApiModelProperty(value = "Indicates whether the Processor is valid, 
invalid, or still in the process of validating (i.e., it is unknown whether or 
not the Processor is valid)",
+        readOnly = true,
+        allowableValues = VALID + ", " + INVALID + ", " + VALIDATING)
+    public String getValidationStatus() {
+        return validationStatus;
+    }
+
+    public void setValidationStatus(String validationStatus) {
+        this.validationStatus = validationStatus;
+    }
+
     @Override
     public int hashCode() {
         final String id = getId();

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java
index c4b34f2..be532e1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java
@@ -196,7 +196,7 @@ public class FlowSnippetDTO {
         TreeSet<T> components = new TreeSet<>(new Comparator<ComponentDTO>() {
             @Override
             public int compare(ComponentDTO c1, ComponentDTO c2) {
-                return 
UUID.fromString(c1.getId()).compareTo(UUID.fromString(c2.getId()));
+                return c1.getId().compareTo(c2.getId());
             }
         });
         components.addAll(dtos);

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
index b3a6c5e..7fcdbbc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java
@@ -28,6 +28,9 @@ import java.util.Map;
  */
 @XmlType(name = "processor")
 public class ProcessorDTO extends ComponentDTO {
+    public static final String VALID = "VALID";
+    public static final String INVALID = "INVALID";
+    public static final String VALIDATING = "VALIDATING";
 
     private String name;
     private String type;
@@ -49,6 +52,7 @@ public class ProcessorDTO extends ComponentDTO {
     private ProcessorConfigDTO config;
 
     private Collection<String> validationErrors;
+    private String validationStatus;
 
     public ProcessorDTO() {
         super();
@@ -306,6 +310,17 @@ public class ProcessorDTO extends ComponentDTO {
         this.validationErrors = validationErrors;
     }
 
+    @ApiModelProperty(value = "Indicates whether the Processor is valid, 
invalid, or still in the process of validating (i.e., it is unknown whether or 
not the Processor is valid)",
+        readOnly = true,
+        allowableValues = VALID + ", " + INVALID + ", " + VALIDATING)
+    public String getValidationStatus() {
+        return validationStatus;
+    }
+
+    public void setValidationStatus(String validationStatus) {
+        this.validationStatus = validationStatus;
+    }
+
     /**
      * @return the description for this processor
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
index 22bfafb..38f3ec8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java
@@ -27,6 +27,9 @@ import java.util.Map;
  */
 @XmlType(name = "reportingTask")
 public class ReportingTaskDTO extends ComponentDTO {
+    public static final String VALID = "VALID";
+    public static final String INVALID = "INVALID";
+    public static final String VALIDATING = "VALIDATING";
 
     private String name;
     private String type;
@@ -50,6 +53,7 @@ public class ReportingTaskDTO extends ComponentDTO {
     private String annotationData;
 
     private Collection<String> validationErrors;
+    private String validationStatus;
     private Integer activeThreadCount;
 
     /**
@@ -298,6 +302,17 @@ public class ReportingTaskDTO extends ComponentDTO {
         this.validationErrors = validationErrors;
     }
 
+    @ApiModelProperty(value = "Indicates whether the Processor is valid, 
invalid, or still in the process of validating (i.e., it is unknown whether or 
not the Processor is valid)",
+        readOnly = true,
+        allowableValues = VALID + ", " + INVALID + ", " + VALIDATING)
+    public String getValidationStatus() {
+        return validationStatus;
+    }
+
+    public void setValidationStatus(String validationStatus) {
+        this.validationStatus = validationStatus;
+    }
+
     /**
      * @return default scheduling period for the different scheduling 
strategies
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java
index c18598b..6f545f3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java
@@ -338,7 +338,7 @@ public final class ResourceFactory {
         return new Resource() {
             @Override
             public String getIdentifier() {
-                return String.format("%s/%s", 
RESTRICTED_COMPONENTS_RESOURCE.getIdentifier(), 
requiredPermission.getPermissionIdentifier());
+                return RESTRICTED_COMPONENTS_RESOURCE.getIdentifier() + "/" + 
requiredPermission.getPermissionIdentifier();
             }
 
             @Override
@@ -374,7 +374,7 @@ public final class ResourceFactory {
         return new Resource() {
             @Override
             public String getIdentifier() {
-                return String.format("%s%s", 
ResourceType.DataTransfer.getValue(), resource.getIdentifier());
+                return ResourceType.DataTransfer.getValue() + 
resource.getIdentifier();
             }
 
             @Override
@@ -409,7 +409,7 @@ public final class ResourceFactory {
         return new Resource() {
             @Override
             public String getIdentifier() {
-                return String.format("%s%s", POLICY_RESOURCE.getIdentifier(), 
resource.getIdentifier());
+                return POLICY_RESOURCE.getIdentifier() + 
resource.getIdentifier();
             }
 
             @Override
@@ -439,7 +439,7 @@ public final class ResourceFactory {
         return new Resource() {
             @Override
             public String getIdentifier() {
-                return String.format("%s/%s", resourceType.getValue(), 
identifier);
+                return resourceType.getValue() + "/" + identifier;
             }
 
             @Override
@@ -500,7 +500,7 @@ public final class ResourceFactory {
         return new Resource() {
             @Override
             public String getIdentifier() {
-                return String.format("%s%s", DATA_RESOURCE.getIdentifier(), 
resource.getIdentifier());
+                return DATA_RESOURCE.getIdentifier() + 
resource.getIdentifier();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 540e426..9f020ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -1,19 +1,16 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+    license agreements. See the NOTICE file distributed with this work for 
additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    You under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
@@ -89,14 +86,25 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-compress</artifactId>
         </dependency>
+
         <!-- third party dependencies -->
-        
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.9.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+
         <!-- sun dependencies -->
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
         </dependency>
-        
+
         <!-- commons dependencies -->
         <dependency>
             <groupId>commons-io</groupId>
@@ -106,24 +114,6 @@
             <groupId>commons-net</groupId>
             <artifactId>commons-net</artifactId>
         </dependency>
-        
-        <!-- jersey dependencies -->
-        <dependency>
-            <groupId>org.glassfish.jersey.core</groupId>
-            <artifactId>jersey-client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish.jersey.media</groupId>
-            <artifactId>jersey-media-json-jackson</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish.jersey.core</groupId>
-            <artifactId>jersey-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish.jersey.inject</groupId>
-            <artifactId>jersey-hk2</artifactId>
-        </dependency>
 
         <!-- jackson dependencies -->
         <dependency>
@@ -141,6 +131,11 @@
             <artifactId>jackson-module-jaxb-annotations</artifactId>
             <version>${jackson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.10.0</version>
+        </dependency>
 
         <!-- spring dependencies -->
         <dependency>
@@ -155,7 +150,7 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
         </dependency>
-        
+
         <!-- testing dependencies for ZooKeeper / Curator -->
         <dependency>
             <groupId>org.apache.curator</groupId>
@@ -173,7 +168,7 @@
             <scope>test</scope>
         </dependency>
 
-        <!-- Spock testing dependencies-->
+        <!-- Spock testing dependencies -->
         <dependency>
             <groupId>org.spockframework</groupId>
             <artifactId>spock-core</artifactId>
@@ -198,5 +193,5 @@
                 </configuration>
             </plugin>
         </plugins>
-    </build>  
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 4c251f9..35bf510 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -126,7 +126,10 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
             // Occasionally Curator appears to not notify us that we have lost 
the elected leader role, or does so
             // on a very large delay. So before we kick the node out of the 
cluster, we want to first check what the
             // ZNode in ZooKeeper says, and ensure that this is the node that 
is being advertised as the appropriate
-            // destination for heartbeats.
+            // destination for heartbeats. In this case, we will also purge 
any heartbeats that we may have received,
+            // so that if we are later elected the coordinator, we don't have 
any stale heartbeats stashed away, which
+            // could lead to immediately disconnecting nodes when this node is 
elected coordinator.
+            purgeHeartbeats();
             logger.debug("It appears that this node is no longer the actively 
elected cluster coordinator. Will not request that node disconnect.");
             return;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
new file mode 100644
index 0000000..bcd3a1f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+
+public interface HttpReplicationClient {
+
+    PreparedRequest prepareRequest(String method, Map<String, String> headers, 
Object entity);
+
+    Response replicate(PreparedRequest request, String uri) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java
new file mode 100644
index 0000000..97bda01
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication;
+
+import java.util.Map;
+
+public interface PreparedRequest {
+    String getMethod();
+
+    Map<String, String> getHeaders();
+
+    Object getEntity();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index bd1e4b3..93804be 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -17,47 +17,7 @@
 
 package org.apache.nifi.cluster.coordination.http.replication;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.authorization.AccessDeniedException;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.authorization.user.NiFiUserUtils;
-import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
-import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
-import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
-import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
-import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
-import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
-import org.apache.nifi.cluster.manager.exception.UriConstructionException;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.protocol.http.HttpHeaders;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.ComponentIdGenerator;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.security.ProxiedEntitiesUtils;
-import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
-import org.glassfish.jersey.client.ClientProperties;
-import org.glassfish.jersey.client.filter.EncodingFilter;
-import org.glassfish.jersey.message.GZipEncoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -66,7 +26,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -86,13 +45,40 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
+import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.ComponentIdGenerator;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class ThreadPoolRequestReplicator implements RequestReplicator {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
 
-    private final Client client; // the client to use for issuing requests
-    private final int connectionTimeoutMs; // connection timeout per node 
request
-    private final int readTimeoutMs; // read timeout per node request
     private final int maxConcurrentRequests; // maximum number of concurrent 
requests
     private final HttpResponseMapper responseMapper;
     private final EventReporter eventReporter;
@@ -110,22 +96,8 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
 
-    /**
-     * Creates an instance using a connection timeout and read timeout of 3 
seconds
-     *
-     * @param corePoolSize core size of the thread pool
-     * @param maxPoolSize the max number of threads in the thread pool
-     * @param maxConcurrentRequests maximum number of concurrent requests
-     * @param client a client for making requests
-     * @param clusterCoordinator the cluster coordinator to use for 
interacting with node statuses
-     * @param callback a callback that will be called whenever all of the 
responses have been gathered for a request. May be null.
-     * @param eventReporter an EventReporter that can be used to notify users 
of interesting events. May be null.
-     * @param nifiProperties properties
-     */
-    public ThreadPoolRequestReplicator(final int corePoolSize, final int 
maxPoolSize, final int maxConcurrentRequests, final Client client, final 
ClusterCoordinator clusterCoordinator,
-                                       final RequestCompletionCallback 
callback, final EventReporter eventReporter, final NiFiProperties 
nifiProperties) {
-        this(corePoolSize, maxPoolSize, maxConcurrentRequests, client, 
clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties);
-    }
+    private HttpReplicationClient httpClient;
+
 
     /**
      * Creates an instance.
@@ -135,15 +107,12 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      * @param maxConcurrentRequests maximum number of concurrent requests
      * @param client a client for making requests
      * @param clusterCoordinator the cluster coordinator to use for 
interacting with node statuses
-     * @param connectionTimeout the connection timeout specified in 
milliseconds
-     * @param readTimeout the read timeout specified in milliseconds
      * @param callback a callback that will be called whenever all of the 
responses have been gathered for a request. May be null.
      * @param eventReporter an EventReporter that can be used to notify users 
of interesting events. May be null.
      * @param nifiProperties properties
      */
-    public ThreadPoolRequestReplicator(final int corePoolSize, final int 
maxPoolSize, final int maxConcurrentRequests, final Client client, final 
ClusterCoordinator clusterCoordinator,
-                                       final String connectionTimeout, final 
String readTimeout, final RequestCompletionCallback callback,
-                                       final EventReporter eventReporter, 
final NiFiProperties nifiProperties) {
+    public ThreadPoolRequestReplicator(final int corePoolSize, final int 
maxPoolSize, final int maxConcurrentRequests, final HttpReplicationClient 
client,
+        final ClusterCoordinator clusterCoordinator, final 
RequestCompletionCallback callback, final EventReporter eventReporter, final 
NiFiProperties nifiProperties) {
         if (corePoolSize <= 0) {
             throw new IllegalArgumentException("The Core Pool Size must be 
greater than zero.");
         } else if (maxPoolSize < corePoolSize) {
@@ -152,19 +121,13 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             throw new IllegalArgumentException("Client may not be null.");
         }
 
-        this.client = client;
         this.clusterCoordinator = clusterCoordinator;
-        this.connectionTimeoutMs = (int) 
FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
-        this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, 
TimeUnit.MILLISECONDS);
         this.maxConcurrentRequests = maxConcurrentRequests;
         this.responseMapper = new StandardHttpResponseMapper(nifiProperties);
         this.eventReporter = eventReporter;
         this.callback = callback;
         this.nifiProperties = nifiProperties;
-
-        client.property(ClientProperties.CONNECT_TIMEOUT, connectionTimeoutMs);
-        client.property(ClientProperties.READ_TIMEOUT, readTimeoutMs);
-        client.property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE);
+        this.httpClient = client;
 
         final AtomicInteger threadId = new AtomicInteger(0);
         final ThreadFactory threadFactory = r -> {
@@ -480,8 +443,10 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             }
 
             // replicate the request to all nodes
+            final PreparedRequest request = httpClient.prepareRequest(method, 
updatedHeaders, entity);
             final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
-                nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, 
nodeId), entity, updatedHeaders, nodeCompletionCallback, finalResponse);
+                nodeId -> new NodeHttpRequest(request, nodeId, createURI(uri, 
nodeId), nodeCompletionCallback, finalResponse);
+
             submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), 
requestFactory, updatedHeaders);
 
             return response;
@@ -557,8 +522,9 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
                                 public void run() {
                                     logger.debug("Found {} dissenting nodes 
for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
 
+                                    final PreparedRequest request = 
httpClient.prepareRequest(method, cancelLockHeaders, entity);
                                     final Function<NodeIdentifier, 
NodeHttpRequest> requestFactory =
-                                        nodeId -> new NodeHttpRequest(nodeId, 
method, createURI(uri, nodeId), entity, cancelLockHeaders, null, 
clusterResponse);
+                                        nodeId -> new NodeHttpRequest(request, 
nodeId, createURI(uri, nodeId), null, clusterResponse);
 
                                     submitAsyncRequest(nodeIds, 
uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
                                 }
@@ -632,8 +598,9 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         };
 
         // Callback function for generating a NodeHttpRequestCallable that can 
be used to perform the work
-        final Function<NodeIdentifier, NodeHttpRequest> requestFactory = 
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, 
validationHeaders, completionCallback,
-            clusterResponse);
+        final PreparedRequest request = httpClient.prepareRequest(method, 
validationHeaders, entity);
+        final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
+            nodeId -> new NodeHttpRequest(request, nodeId, createURI(uri, 
nodeId), completionCallback, clusterResponse);
 
         // replicate the 'verification request' to all nodes
         submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), 
requestFactory, validationHeaders);
@@ -651,18 +618,19 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     }
 
     // Visible for testing - overriding this method makes it easy to verify 
behavior without actually making any web requests
-    protected NodeResponse replicateRequest(final Invocation invocation, final 
NodeIdentifier nodeId, final String method, final URI uri, final String 
requestId,
-        final Map<String, String> headers, final StandardAsyncClusterResponse 
clusterResponse) {
+    protected NodeResponse replicateRequest(final PreparedRequest request, 
final NodeIdentifier nodeId, final URI uri, final String requestId,
+            final StandardAsyncClusterResponse clusterResponse) throws 
IOException {
+
         final Response response;
         final long startNanos = System.nanoTime();
-        logger.debug("Replicating request to {} {}, request ID = {}, headers = 
{}", method, uri, requestId, headers);
+        logger.debug("Replicating request to {} {}, request ID = {}, headers = 
{}", request.getMethod(), uri, requestId, request.getHeaders());
 
         // invoke the request
-        response = invocation.invoke();
+        response = httpClient.replicate(request, uri.toString());
 
         final long nanos = System.nanoTime() - startNanos;
         clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), 
nanos);
-        final NodeResponse nodeResponse = new NodeResponse(nodeId, method, 
uri, response, System.nanoTime() - startNanos, requestId);
+        final NodeResponse nodeResponse = new NodeResponse(nodeId, 
request.getMethod(), uri, response, System.nanoTime() - startNanos, requestId);
         if (nodeResponse.is2xx()) {
             final int length = nodeResponse.getClientResponse().getLength();
             if (length > 0) {
@@ -822,20 +790,17 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         private final NodeIdentifier nodeId;
         private final String method;
         private final URI uri;
-        private final Object entity;
-        private final Map<String, String> headers = new HashMap<>();
         private final NodeRequestCompletionCallback callback;
         private final StandardAsyncClusterResponse clusterResponse;
         private final long creationNanos = System.nanoTime();
-        private final GZipEncoder gzipEncoder = new GZipEncoder();
+        private final PreparedRequest request;
 
-        private NodeHttpRequest(final NodeIdentifier nodeId, final String 
method, final URI uri, final Object entity, final Map<String, String> headers,
-            final NodeRequestCompletionCallback callback, final 
StandardAsyncClusterResponse clusterResponse) {
+        private NodeHttpRequest(final PreparedRequest request, final 
NodeIdentifier nodeId, final URI uri,
+                final NodeRequestCompletionCallback callback, final 
StandardAsyncClusterResponse clusterResponse) {
+            this.request = request;
             this.nodeId = nodeId;
-            this.method = method;
+            this.method = request.getMethod();
             this.uri = uri;
-            this.entity = entity;
-            this.headers.putAll(headers);
             this.callback = callback;
             this.clusterResponse = clusterResponse;
         }
@@ -849,30 +814,11 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             NodeResponse nodeResponse;
 
             try {
-                final String rawAcceptEncoding = 
headers.get(HttpHeaders.ACCEPT_ENCODING);
-
-                final boolean useGzip;
-                if (rawAcceptEncoding == null) {
-                    useGzip = false;
-                } else {
-                    final String[] acceptEncodingTokens = 
rawAcceptEncoding.split(",");
-                    final Set<String> acceptEncoding = 
Stream.of(acceptEncodingTokens)
-                            .map(String::trim)
-                            .filter(enc -> StringUtils.isNotEmpty(enc))
-                            .map(String::toLowerCase)
-                            .collect(Collectors.toSet());
-
-                    final Set<String> supportedEncodings = 
gzipEncoder.getSupportedEncodings();
-                    useGzip = supportedEncodings.stream()
-                            .anyMatch(supportedEncoding -> 
acceptEncoding.contains(supportedEncoding.toLowerCase()));
-                }
-
                 // create and send the request
-                final Invocation invocation = createInvocation(useGzip);
-                final String requestId = headers.get("x-nifi-request-id");
-
+                final String requestId = 
request.getHeaders().get("x-nifi-request-id");
                 logger.debug("Replicating request {} {} to {}", method, 
uri.getPath(), nodeId);
-                nodeResponse = replicateRequest(invocation, nodeId, method, 
uri, requestId, headers, clusterResponse);
+
+                nodeResponse = replicateRequest(request, nodeId, uri, 
requestId, clusterResponse);
             } catch (final Exception e) {
                 nodeResponse = new NodeResponse(nodeId, method, uri, e);
                 logger.warn("Failed to replicate request {} {} to {} due to 
{}", method, uri.getPath(), nodeId, e.toString());
@@ -884,66 +830,6 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
                 callback.onCompletion(nodeResponse);
             }
         }
-
-        private Invocation createInvocation(final boolean useGzip) {
-            // convert parameters to a more convenient data structure
-            final MultivaluedHashMap<String, String> map = new 
MultivaluedHashMap();
-
-            if (entity instanceof MultivaluedMap) {
-                map.putAll((Map) entity);
-            }
-
-            // create the resource
-            WebTarget webTarget = client.target(uri);
-
-            if (useGzip) {
-                webTarget = 
webTarget.register(EncodingFilter.class).register(gzipEncoder);
-            }
-
-            final Invocation invocation;
-
-            // set the parameters as either query parameters or as request body
-            if (HttpMethod.DELETE.equalsIgnoreCase(method) || 
HttpMethod.HEAD.equalsIgnoreCase(method) || 
HttpMethod.GET.equalsIgnoreCase(method) || 
HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
-                for (final Entry<String, List<String>> queryEntry : 
map.entrySet()) {
-                    webTarget = webTarget.queryParam(queryEntry.getKey(), 
queryEntry.getValue().toArray());
-                }
-
-                Invocation.Builder builder = webTarget.request();
-                for (final Map.Entry<String, String> entry : 
headers.entrySet()) {
-                    builder = builder.header(entry.getKey(), entry.getValue());
-                }
-
-                invocation = builder.build(method);
-            } else {
-                Invocation.Builder builder = webTarget.request();
-
-                // detect the content type
-                String contentType = null;
-                for (final Map.Entry<String, String> entry : 
headers.entrySet()) {
-                    builder.header(entry.getKey(), entry.getValue());
-
-                    // record the content type
-                    if (entry.getKey().equalsIgnoreCase("content-type")) {
-                        contentType = entry.getValue();
-                    }
-
-                    // never break
-                }
-
-                // set default content type
-                if (contentType == null) {
-                    contentType = MediaType.APPLICATION_FORM_URLENCODED;
-                }
-
-                if (entity == null) {
-                    invocation = builder.build(method, Entity.entity(map, 
contentType));
-                } else {
-                    invocation = builder.build(method, Entity.entity(entity, 
contentType));
-                }
-            }
-
-            return invocation;
-        }
     }
 
     private static interface NodeRequestCompletionCallback {

http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
new file mode 100644
index 0000000..19028e3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface EntitySerializer {
+    void serialize(Object entity, OutputStream out) throws IOException;
+}

Reply via email to