NIFI-950: Make component validation asynchronous NIFI-950: Still seeing some slow response times when instantiating a large template in cluster mode so making some minor tweaks based on the results of CPU profiling NIFI-5112: Refactored FlowSerializer so that it creates the desired intermediate data model that can be serialized, separate from serializing. This allows us to hold the FlowController's Read Lock only while creating the data model, not while actually serializing the data. Configured Jersey Client in ThreadPoolRequestReplicator not to look for features using the Service Loader for every request. Updated Template object to hold a DOM Node that represents the template contents instead of having to serialize the DTO, then parse the serialized form as a DOM object each time that it needs to be serialized. NIFI-5112: Change ThreadPoolRequestReplicator to use OkHttp client instead of Jersey Client NIFI-5111: Ensure that if a node is no longer cluster coordinator, that it clears any stale heartbeats. NIFI-5110: Notify StandardProcessScheduler when a component is removed so that it will clean up any resource related to component lifecycle. NIFI-950: Avoid gathering the Status objects for entire flow when we don't need them; removed unnecessary code NIFI-950: Bug fixes NIFI-950: Bug fix; added validation status to ProcessorDTO, ControllerServiceDTO, ReportingTaskDTO; updated DebugFlow to allow for pause time to be set in the customValidate method for testing functionality NIFI-950: Addressing test failures NIFI-950: Bug fixes NIFI-950: Addressing review feedback NIFI-950: Fixed validation logic in mock framework This closes #2693
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/604656fe Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/604656fe Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/604656fe Branch: refs/heads/master Commit: 604656fe8829a79549c24d34c0149891e0edebe9 Parents: ea41b41 Author: Mark Payne <marka...@hotmail.com> Authored: Wed Apr 11 15:36:54 2018 -0400 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Wed May 16 14:39:23 2018 -0400 ---------------------------------------------------------------------- .../nifi/components/ConfigurableComponent.java | 6 +- .../nifi/components/PropertyDescriptor.java | 92 +-- .../nifi/components/ValidationResult.java | 2 +- .../org/apache/nifi/components/Validator.java | 2 +- .../nifi/web/util/NiFiHostnameVerifier.java | 61 ++ .../java/org/apache/nifi/web/util/WebUtils.java | 33 +- .../apache/nifi/util/MockProcessContext.java | 69 +- .../nifi/schema/access/SchemaAccessUtils.java | 4 +- .../AccessPolicyProviderInvocationHandler.java | 12 +- .../AuthorizationAuditorInvocationHandler.java | 11 +- .../AuthorizerInvocationHandler.java | 11 +- .../nifi/web/api/dto/ControllerServiceDTO.java | 15 + .../apache/nifi/web/api/dto/FlowSnippetDTO.java | 2 +- .../apache/nifi/web/api/dto/ProcessorDTO.java | 15 + .../nifi/web/api/dto/ReportingTaskDTO.java | 15 + .../authorization/resource/ResourceFactory.java | 10 +- .../nifi-framework-cluster/pom.xml | 71 +- .../heartbeat/AbstractHeartbeatMonitor.java | 5 +- .../http/replication/HttpReplicationClient.java | 31 + .../http/replication/PreparedRequest.java | 28 + .../ThreadPoolRequestReplicator.java | 230 ++---- .../replication/okhttp/EntitySerializer.java | 25 + .../replication/okhttp/JacksonResponse.java | 237 ++++++ .../okhttp/JsonEntitySerializer.java | 42 ++ .../okhttp/OkHttpPreparedRequest.java | 62 ++ .../okhttp/OkHttpReplicationClient.java | 366 ++++++++++ .../replication/okhttp/XmlEntitySerializer.java | 60 ++ .../manager/ControllerServiceEntityMerger.java | 17 +- .../nifi/cluster/manager/ErrorMerger.java | 31 +- .../cluster/manager/ProcessorEntityMerger.java | 7 + .../manager/ReportingTaskEntityMerger.java | 7 + .../ThreadPoolRequestReplicatorFactoryBean.java | 15 +- .../TestThreadPoolRequestReplicator.java | 118 +-- .../okhttp/TestJsonEntitySerializer.java | 97 +++ .../replication/util/MockReplicationClient.java | 217 ++++++ .../ControllerServiceEntityMergerSpec.groovy | 4 +- .../DisabledServiceValidationResult.java | 42 ++ .../components/validation/ValidationState.java | 40 + .../components/validation/ValidationStatus.java | 35 + .../validation/ValidationTrigger.java | 36 + .../nifi/controller/AbstractComponentNode.java | 728 +++++++++++++++++++ .../controller/AbstractConfiguredComponent.java | 607 ---------------- .../apache/nifi/controller/ComponentNode.java | 201 +++++ .../nifi/controller/ConfiguredComponent.java | 154 ---- .../nifi/controller/ProcessScheduler.java | 28 +- .../apache/nifi/controller/ProcessorNode.java | 10 +- .../nifi/controller/ReportingTaskNode.java | 2 +- .../org/apache/nifi/controller/Template.java | 21 + .../ControllerServiceDisabledException.java | 31 + .../service/ControllerServiceNode.java | 19 +- .../service/ControllerServiceProvider.java | 10 +- .../service/ControllerServiceReference.java | 6 +- .../org/apache/nifi/groups/ProcessGroup.java | 9 +- .../controller/TestAbstractComponentNode.java | 126 ++++ .../validation/StandardValidationTrigger.java | 59 ++ .../validation/TriggerValidationTask.java | 56 ++ .../apache/nifi/controller/FlowController.java | 272 ++++--- .../nifi/controller/StandardProcessorNode.java | 409 ++++++----- .../apache/nifi/controller/TemplateUtils.java | 2 + .../reporting/AbstractReportingTaskNode.java | 43 +- .../reporting/StandardReportingTaskNode.java | 9 +- .../scheduling/StandardProcessScheduler.java | 26 +- .../serialization/FlowSerializer.java | 16 +- .../serialization/StandardFlowSerializer.java | 87 ++- .../service/ControllerServiceLoader.java | 9 + .../service/ServiceStateTransition.java | 20 + .../service/StandardConfigurationContext.java | 6 +- ...ndardControllerServiceInvocationHandler.java | 11 +- .../service/StandardControllerServiceNode.java | 89 ++- .../StandardControllerServiceProvider.java | 29 +- .../StandardControllerServiceReference.java | 24 +- .../nifi/groups/StandardProcessGroup.java | 78 +- .../nifi/persistence/TemplateDeserializer.java | 17 +- .../nifi/persistence/TemplateSerializer.java | 13 +- .../processor/StandardSchedulingContext.java | 8 +- .../processor/StandardValidationContext.java | 16 +- .../flow/mapping/NiFiRegistryFlowMapper.java | 6 +- .../apache/nifi/util/ClassAnnotationPair.java | 61 ++ .../org/apache/nifi/util/ReflectionUtils.java | 79 +- .../controller/StandardFlowServiceTest.java | 22 +- .../nifi/controller/TestFlowController.java | 17 +- .../controller/TestStandardProcessorNode.java | 37 +- .../scheduling/StandardProcessSchedulerIT.java | 6 +- .../scheduling/TestProcessorLifecycle.java | 139 +--- .../TestStandardProcessScheduler.java | 43 +- .../StandardFlowSerializerTest.java | 4 +- .../StandardControllerServiceProviderIT.java | 4 +- .../StandardControllerServiceProviderTest.java | 3 +- .../TestStandardControllerServiceProvider.java | 44 +- .../service/mock/MockProcessGroup.java | 4 +- .../nifi/util/SynchronousValidationTrigger.java | 35 + .../src/test/resources/logback-test.xml | 2 +- .../org/apache/nifi/nar/ExtensionManager.java | 2 +- .../nifi/audit/ControllerServiceAuditor.java | 6 +- .../StandardAuthorizableLookup.java | 15 +- .../nifi/web/StandardNiFiServiceFacade.java | 69 +- .../org/apache/nifi/web/api/FlowResource.java | 15 +- .../nifi/web/api/ProcessGroupResource.java | 2 +- .../apache/nifi/web/api/SnippetResource.java | 39 +- .../org/apache/nifi/web/api/dto/DtoFactory.java | 19 +- .../nifi/web/controller/ControllerFacade.java | 23 +- .../web/controller/ControllerSearchService.java | 5 +- .../nifi/web/dao/ControllerServiceDAO.java | 4 +- .../dao/impl/StandardControllerServiceDAO.java | 6 +- .../nifi/web/dao/impl/StandardProcessorDAO.java | 2 +- .../nifi/processors/standard/DebugFlow.java | 30 +- .../apache/nifi/ssl/SSLContextServiceTest.java | 20 +- 107 files changed, 4205 insertions(+), 1992 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java b/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java index cd8c781..2f693da 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/ConfigurableComponent.java @@ -26,13 +26,13 @@ public interface ConfigurableComponent { /** * Validates a set of properties, returning ValidationResults for any * invalid properties. All defined properties will be validated. If they are - * not included in the in the purposed configuration, the default value will + * not included in the purposed configuration, the default value will * be used. * * @param context of validation * @return Collection of validation result objects for any invalid findings - * only. If the collection is empty then the component is valid. Guaranteed - * non-null + * only. If the collection is empty then the component is valid. Guaranteed + * non-null */ Collection<ValidationResult> validate(ValidationContext context); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index 3ac7510..3e767ba 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -140,9 +140,11 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> */ public ValidationResult validate(final String input, final ValidationContext context) { ValidationResult lastResult = Validator.INVALID.validate(this.name, input, context); + if (allowableValues != null && !allowableValues.isEmpty()) { final ConstrainedSetValidator csValidator = new ConstrainedSetValidator(allowableValues); final ValidationResult csResult = csValidator.validate(this.name, input, context); + if (csResult.isValid()) { lastResult = csResult; } else { @@ -150,90 +152,32 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> } } - // if the property descriptor identifies a Controller Service, validate that the ControllerService exists, is of the correct type, and is valid - if (controllerServiceDefinition != null) { - final Set<String> validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition); - if (validIdentifiers != null && validIdentifiers.contains(input)) { - final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input); - if (!context.isValidationRequired(controllerService)) { - return new ValidationResult.Builder() - .input(input) - .subject(getName()) - .valid(true) - .build(); - } - - final String serviceId = controllerService.getIdentifier(); - if (!isDependentServiceEnableable(context, serviceId)) { - return new ValidationResult.Builder() - .input(context.getControllerServiceLookup().getControllerServiceName(serviceId)) - .subject(getName()) - .valid(false) - .explanation("Controller Service " + controllerService + " is disabled") - .build(); - } - - final Collection<ValidationResult> validationResults = controllerService.validate(context.getControllerServiceValidationContext(controllerService)); - final List<ValidationResult> invalidResults = new ArrayList<>(); - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - invalidResults.add(result); - } - } - if (!invalidResults.isEmpty()) { - return new ValidationResult.Builder() - .input(input) - .subject(getName()) - .valid(false) - .explanation("Controller Service is not valid: " + (invalidResults.size() > 1 ? invalidResults : invalidResults.get(0))) - .build(); - } + for (final Validator validator : validators) { + lastResult = validator.validate(this.name, input, context); + if (!lastResult.isValid()) { + break; + } + } + if (getControllerServiceDefinition() != null) { + final ControllerService service = context.getControllerServiceLookup().getControllerService(input); + if (service == null) { return new ValidationResult.Builder() - .input(input) - .subject(getName()) - .valid(true) - .build(); + .input(input) + .subject(getDisplayName()) + .valid(false) + .explanation("Property references a Controller Service that does not exist") + .build(); } else { return new ValidationResult.Builder() - .input(input) - .subject(getName()) - .valid(false) - .explanation("Invalid Controller Service: " + input + " is not a valid Controller Service Identifier or does not reference the correct type of Controller Service") - .build(); + .valid(true) + .build(); } } - for (final Validator validator : validators) { - lastResult = validator.validate(this.name, input, context); - if (!lastResult.isValid()) { - break; - } - } return lastResult; } - /** - * Will validate if the dependent service (service identified with the - * 'serviceId') is 'enableable' which means that the dependent service is - * either in ENABLING or ENABLED state. The important issue here is to - * understand the order in which states are assigned: - * - * - Upon the initialization of the service its state is set to ENABLING. - * - * - Transition to ENABLED will happen asynchronously. - * - * So we check first for ENABLING state and if it succeeds we skip the check - * for ENABLED state even though by the time this method returns the - * dependent service's state could be fully ENABLED. - */ - private boolean isDependentServiceEnableable(final ValidationContext context, final String serviceId) { - boolean enableable = context.getControllerServiceLookup().isControllerServiceEnabling(serviceId); - if (!enableable) { - enableable = context.getControllerServiceLookup().isControllerServiceEnabled(serviceId); - } - return enableable; - } public static final class Builder { http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java b/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java index e0beec8..969c2d7 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java @@ -30,7 +30,7 @@ public class ValidationResult { private final String explanation; private final boolean valid; - private ValidationResult(final Builder builder) { + protected ValidationResult(final Builder builder) { this.subject = builder.subject; this.input = builder.input; this.explanation = builder.explanation; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-api/src/main/java/org/apache/nifi/components/Validator.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/Validator.java b/nifi-api/src/main/java/org/apache/nifi/components/Validator.java index a12b532..3befdf8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/Validator.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/Validator.java @@ -28,7 +28,7 @@ public interface Validator { Validator INVALID = new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - return new ValidationResult.Builder().subject(subject).explanation(String.format("'%s' is not a supported property", subject)).input(input).build(); + return new ValidationResult.Builder().subject(subject).explanation(String.format("'%s' is not a supported property or has no Validator associated with it", subject)).input(input).build(); } }; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java new file mode 100644 index 0000000..960af58 --- /dev/null +++ b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/NiFiHostnameVerifier.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.util; + +import java.security.cert.Certificate; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.util.List; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; + +import org.apache.nifi.security.util.CertificateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NiFiHostnameVerifier implements HostnameVerifier { + private static final Logger logger = LoggerFactory.getLogger(NiFiHostnameVerifier.class); + + @Override + public boolean verify(final String hostname, final SSLSession ssls) { + try { + for (final Certificate peerCertificate : ssls.getPeerCertificates()) { + if (peerCertificate instanceof X509Certificate) { + final X509Certificate x509Cert = (X509Certificate) peerCertificate; + final String dn = x509Cert.getSubjectDN().getName(); + final String commonName = CertificateUtils.extractUsername(dn); + if (commonName.equals(hostname)) { + return true; + } + + final List<String> subjectAltNames = CertificateUtils.getSubjectAlternativeNames(x509Cert); + if (subjectAltNames.contains(hostname.toLowerCase())) { + return true; + } + } + } + } catch (final SSLPeerUnverifiedException | CertificateParsingException ex) { + logger.warn("Hostname Verification encountered exception verifying hostname due to: " + ex, ex); + } + + return false; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java index 351d7f9..21a41fd 100644 --- a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java +++ b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java @@ -17,23 +17,18 @@ package org.apache.nifi.web.util; import java.net.URI; -import java.security.cert.Certificate; -import java.security.cert.CertificateParsingException; -import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.net.ssl.HostnameVerifier; + import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.UriBuilderException; + import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.security.util.CertificateUtils; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.jackson.internal.jackson.jaxrs.json.JacksonJaxbJsonProvider; import org.slf4j.Logger; @@ -100,29 +95,7 @@ public final class WebUtils { if (ctx != null) { // custom hostname verifier that checks subject alternative names against the hostname of the URI - final HostnameVerifier hostnameVerifier = new HostnameVerifier() { - @Override - public boolean verify(final String hostname, final SSLSession ssls) { - - try { - for (final Certificate peerCertificate : ssls.getPeerCertificates()) { - if (peerCertificate instanceof X509Certificate) { - final X509Certificate x509Cert = (X509Certificate) peerCertificate; - final List<String> subjectAltNames = CertificateUtils.getSubjectAlternativeNames(x509Cert); - if (subjectAltNames.contains(hostname.toLowerCase())) { - return true; - } - } - } - } catch (final SSLPeerUnverifiedException | CertificateParsingException ex) { - logger.warn("Hostname Verification encountered exception verifying hostname due to: " + ex, ex); - } - - return false; - } - }; - - clientBuilder = clientBuilder.sslContext(ctx).hostnameVerifier(hostnameVerifier); + clientBuilder = clientBuilder.sslContext(ctx).hostnameVerifier(new NiFiHostnameVerifier()); } clientBuilder = clientBuilder.register(ObjectMapperResolver.class).register(JacksonJaxbJsonProvider.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 3448524..a16216d 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -18,6 +18,7 @@ package org.apache.nifi.util; import static java.util.Objects.requireNonNull; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -34,6 +35,7 @@ import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; @@ -240,7 +242,72 @@ public class MockProcessContext extends MockControllerServiceLookup implements S * non-null */ public Collection<ValidationResult> validate() { - return component.validate(new MockValidationContext(this, stateManager, variableRegistry)); + final List<ValidationResult> results = new ArrayList<>(); + final ValidationContext validationContext = new MockValidationContext(this, stateManager, variableRegistry); + final Collection<ValidationResult> componentResults = component.validate(validationContext); + results.addAll(componentResults); + + final Collection<ValidationResult> serviceResults = validateReferencedControllerServices(validationContext); + results.addAll(serviceResults); + return results; + } + + protected final Collection<ValidationResult> validateReferencedControllerServices(final ValidationContext validationContext) { + final List<PropertyDescriptor> supportedDescriptors = component.getPropertyDescriptors(); + if (supportedDescriptors == null) { + return Collections.emptyList(); + } + + final Collection<ValidationResult> validationResults = new ArrayList<>(); + for (final PropertyDescriptor descriptor : supportedDescriptors) { + if (descriptor.getControllerServiceDefinition() == null) { + // skip properties that aren't for a controller service + continue; + } + + final String controllerServiceId = validationContext.getProperty(descriptor).getValue(); + if (controllerServiceId == null) { + continue; + } + + final ControllerService controllerService = getControllerService(controllerServiceId); + if (controllerService == null) { + final ValidationResult result = new ValidationResult.Builder() + .valid(false) + .subject(descriptor.getDisplayName()) + .input(controllerServiceId) + .explanation("Invalid Controller Service: " + controllerServiceId + " is not a valid Controller Service Identifier") + .build(); + + validationResults.add(result); + continue; + } + + final Class<? extends ControllerService> requiredServiceClass = descriptor.getControllerServiceDefinition(); + if (!requiredServiceClass.isAssignableFrom(controllerService.getClass())) { + final ValidationResult result = new ValidationResult.Builder() + .valid(false) + .subject(descriptor.getDisplayName()) + .input(controllerServiceId) + .explanation("Invalid Controller Service: " + controllerServiceId + " does not implement interface " + requiredServiceClass) + .build(); + + validationResults.add(result); + continue; + } + + final boolean enabled = isControllerServiceEnabled(controllerServiceId); + if (!enabled) { + validationResults.add(new ValidationResult.Builder() + .input(controllerServiceId) + .subject(descriptor.getDisplayName()) + .explanation("Controller Service with ID " + controllerServiceId + " is not enabled") + .valid(false) + .build()); + } + } + + return validationResults; } public boolean isValid() { http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java index 111b02a..82ea240 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java @@ -85,7 +85,7 @@ public class SchemaAccessUtils { .description("Specifies the name of the branch to use when looking up the schema in the Schema Registry property. " + "If the chosen Schema Registry does not support branching, this value will be ignored.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); @@ -95,7 +95,7 @@ public class SchemaAccessUtils { .description("Specifies the version of the schema to lookup in the Schema Registry. " + "If not specified then the latest version of the schema will be retrieved.") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java index e41afa3..63bd2ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AccessPolicyProviderInvocationHandler.java @@ -23,10 +23,18 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; public class AccessPolicyProviderInvocationHandler implements InvocationHandler { - + private static final Method getUserGroupProviderMethod; private final AccessPolicyProvider accessPolicyProvider; private final ClassLoader classLoader; + static { + try { + getUserGroupProviderMethod = AccessPolicyProvider.class.getMethod("getUserGroupProvider"); + } catch (final Exception e) { + throw new RuntimeException("Unable to obtain necessary class information for AccessPolicyProvider", e); + } + } + public AccessPolicyProviderInvocationHandler(final AccessPolicyProvider accessPolicyProvider, final ClassLoader classLoader) { this.accessPolicyProvider = accessPolicyProvider; this.classLoader = classLoader; @@ -35,7 +43,7 @@ public class AccessPolicyProviderInvocationHandler implements InvocationHandler @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) { - if (AccessPolicyProvider.class.getMethod("getUserGroupProvider").equals(method)) { + if (getUserGroupProviderMethod.equals(method)) { final UserGroupProvider userGroupProvider = (UserGroupProvider) method.invoke(accessPolicyProvider, args); if (userGroupProvider == null) { return userGroupProvider; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java index 7f8d76c..1dc81d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizationAuditorInvocationHandler.java @@ -24,6 +24,15 @@ public class AuthorizationAuditorInvocationHandler implements InvocationHandler private final Authorizer authorizer; private final AuthorizationAuditor auditor; + private static final Method auditAccessAttemptMethod; + + static { + try { + auditAccessAttemptMethod = AuthorizationAuditor.class.getMethod("auditAccessAttempt", AuthorizationRequest.class, AuthorizationResult.class); + } catch (final Exception e) { + throw new RuntimeException("Unable to obtain necessary class information for AccessPolicyProvider", e); + } + } public AuthorizationAuditorInvocationHandler(final Authorizer authorizer, final AuthorizationAuditor auditor) { this.authorizer = authorizer; @@ -33,7 +42,7 @@ public class AuthorizationAuditorInvocationHandler implements InvocationHandler @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { try { - if (AuthorizationAuditor.class.getMethod("auditAccessAttempt", AuthorizationRequest.class, AuthorizationResult.class).equals(method)) { + if (auditAccessAttemptMethod.equals(method)) { return method.invoke(auditor, args); } else { return method.invoke(authorizer, args); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java index 228fe09..c8b6771 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-authorizer/src/main/java/org/apache/nifi/authorization/AuthorizerInvocationHandler.java @@ -24,9 +24,18 @@ import java.lang.reflect.Method; public class AuthorizerInvocationHandler implements InvocationHandler { + private static final Method getAccessPolicyProviderMethod; private final Authorizer authorizer; private final ClassLoader classLoader; + static { + try { + getAccessPolicyProviderMethod = ManagedAuthorizer.class.getMethod("getAccessPolicyProvider"); + } catch (final Exception e) { + throw new RuntimeException("Unable to obtain necessary class information for AccessPolicyProvider", e); + } + } + public AuthorizerInvocationHandler(final Authorizer authorizer, final ClassLoader classLoader) { this.authorizer = authorizer; this.classLoader = classLoader; @@ -35,7 +44,7 @@ public class AuthorizerInvocationHandler implements InvocationHandler { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(classLoader)) { - if (ManagedAuthorizer.class.getMethod("getAccessPolicyProvider").equals(method)) { + if (getAccessPolicyProviderMethod.equals(method)) { final AccessPolicyProvider accessPolicyProvider = (AccessPolicyProvider) method.invoke(authorizer, args); if (accessPolicyProvider == null) { return accessPolicyProvider; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java index 571598c..dbea636 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ControllerServiceDTO.java @@ -30,6 +30,9 @@ import java.util.Set; */ @XmlType(name = "controllerService") public class ControllerServiceDTO extends ComponentDTO { + public static final String VALID = "VALID"; + public static final String INVALID = "INVALID"; + public static final String VALIDATING = "VALIDATING"; private String name; private String type; @@ -52,6 +55,7 @@ public class ControllerServiceDTO extends ComponentDTO { private Set<ControllerServiceReferencingComponentEntity> referencingComponents; private Collection<String> validationErrors; + private String validationStatus; /** * @return controller service name @@ -298,6 +302,17 @@ public class ControllerServiceDTO extends ComponentDTO { this.validationErrors = validationErrors; } + @ApiModelProperty(value = "Indicates whether the Processor is valid, invalid, or still in the process of validating (i.e., it is unknown whether or not the Processor is valid)", + readOnly = true, + allowableValues = VALID + ", " + INVALID + ", " + VALIDATING) + public String getValidationStatus() { + return validationStatus; + } + + public void setValidationStatus(String validationStatus) { + this.validationStatus = validationStatus; + } + @Override public int hashCode() { final String id = getId(); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java index c4b34f2..be532e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowSnippetDTO.java @@ -196,7 +196,7 @@ public class FlowSnippetDTO { TreeSet<T> components = new TreeSet<>(new Comparator<ComponentDTO>() { @Override public int compare(ComponentDTO c1, ComponentDTO c2) { - return UUID.fromString(c1.getId()).compareTo(UUID.fromString(c2.getId())); + return c1.getId().compareTo(c2.getId()); } }); components.addAll(dtos); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java index b3a6c5e..7fcdbbc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorDTO.java @@ -28,6 +28,9 @@ import java.util.Map; */ @XmlType(name = "processor") public class ProcessorDTO extends ComponentDTO { + public static final String VALID = "VALID"; + public static final String INVALID = "INVALID"; + public static final String VALIDATING = "VALIDATING"; private String name; private String type; @@ -49,6 +52,7 @@ public class ProcessorDTO extends ComponentDTO { private ProcessorConfigDTO config; private Collection<String> validationErrors; + private String validationStatus; public ProcessorDTO() { super(); @@ -306,6 +310,17 @@ public class ProcessorDTO extends ComponentDTO { this.validationErrors = validationErrors; } + @ApiModelProperty(value = "Indicates whether the Processor is valid, invalid, or still in the process of validating (i.e., it is unknown whether or not the Processor is valid)", + readOnly = true, + allowableValues = VALID + ", " + INVALID + ", " + VALIDATING) + public String getValidationStatus() { + return validationStatus; + } + + public void setValidationStatus(String validationStatus) { + this.validationStatus = validationStatus; + } + /** * @return the description for this processor */ http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java index 22bfafb..38f3ec8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ReportingTaskDTO.java @@ -27,6 +27,9 @@ import java.util.Map; */ @XmlType(name = "reportingTask") public class ReportingTaskDTO extends ComponentDTO { + public static final String VALID = "VALID"; + public static final String INVALID = "INVALID"; + public static final String VALIDATING = "VALIDATING"; private String name; private String type; @@ -50,6 +53,7 @@ public class ReportingTaskDTO extends ComponentDTO { private String annotationData; private Collection<String> validationErrors; + private String validationStatus; private Integer activeThreadCount; /** @@ -298,6 +302,17 @@ public class ReportingTaskDTO extends ComponentDTO { this.validationErrors = validationErrors; } + @ApiModelProperty(value = "Indicates whether the Processor is valid, invalid, or still in the process of validating (i.e., it is unknown whether or not the Processor is valid)", + readOnly = true, + allowableValues = VALID + ", " + INVALID + ", " + VALIDATING) + public String getValidationStatus() { + return validationStatus; + } + + public void setValidationStatus(String validationStatus) { + this.validationStatus = validationStatus; + } + /** * @return default scheduling period for the different scheduling strategies */ http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java index c18598b..6f545f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-authorization/src/main/java/org/apache/nifi/authorization/resource/ResourceFactory.java @@ -338,7 +338,7 @@ public final class ResourceFactory { return new Resource() { @Override public String getIdentifier() { - return String.format("%s/%s", RESTRICTED_COMPONENTS_RESOURCE.getIdentifier(), requiredPermission.getPermissionIdentifier()); + return RESTRICTED_COMPONENTS_RESOURCE.getIdentifier() + "/" + requiredPermission.getPermissionIdentifier(); } @Override @@ -374,7 +374,7 @@ public final class ResourceFactory { return new Resource() { @Override public String getIdentifier() { - return String.format("%s%s", ResourceType.DataTransfer.getValue(), resource.getIdentifier()); + return ResourceType.DataTransfer.getValue() + resource.getIdentifier(); } @Override @@ -409,7 +409,7 @@ public final class ResourceFactory { return new Resource() { @Override public String getIdentifier() { - return String.format("%s%s", POLICY_RESOURCE.getIdentifier(), resource.getIdentifier()); + return POLICY_RESOURCE.getIdentifier() + resource.getIdentifier(); } @Override @@ -439,7 +439,7 @@ public final class ResourceFactory { return new Resource() { @Override public String getIdentifier() { - return String.format("%s/%s", resourceType.getValue(), identifier); + return resourceType.getValue() + "/" + identifier; } @Override @@ -500,7 +500,7 @@ public final class ResourceFactory { return new Resource() { @Override public String getIdentifier() { - return String.format("%s%s", DATA_RESOURCE.getIdentifier(), resource.getIdentifier()); + return DATA_RESOURCE.getIdentifier() + resource.getIdentifier(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml index 540e426..9f020ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml @@ -1,19 +1,16 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> @@ -89,14 +86,25 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> </dependency> + <!-- third party dependencies --> - + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.9.4</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <!-- sun dependencies --> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> </dependency> - + <!-- commons dependencies --> <dependency> <groupId>commons-io</groupId> @@ -106,24 +114,6 @@ <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> </dependency> - - <!-- jersey dependencies --> - <dependency> - <groupId>org.glassfish.jersey.core</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.media</groupId> - <artifactId>jersey-media-json-jackson</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.core</groupId> - <artifactId>jersey-common</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.inject</groupId> - <artifactId>jersey-hk2</artifactId> - </dependency> <!-- jackson dependencies --> <dependency> @@ -141,6 +131,11 @@ <artifactId>jackson-module-jaxb-annotations</artifactId> <version>${jackson.version}</version> </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>3.10.0</version> + </dependency> <!-- spring dependencies --> <dependency> @@ -155,7 +150,7 @@ <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> - + <!-- testing dependencies for ZooKeeper / Curator --> <dependency> <groupId>org.apache.curator</groupId> @@ -173,7 +168,7 @@ <scope>test</scope> </dependency> - <!-- Spock testing dependencies--> + <!-- Spock testing dependencies --> <dependency> <groupId>org.spockframework</groupId> <artifactId>spock-core</artifactId> @@ -198,5 +193,5 @@ </configuration> </plugin> </plugins> - </build> + </build> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 4c251f9..35bf510 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -126,7 +126,10 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { // Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so // on a very large delay. So before we kick the node out of the cluster, we want to first check what the // ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate - // destination for heartbeats. + // destination for heartbeats. In this case, we will also purge any heartbeats that we may have received, + // so that if we are later elected the coordinator, we don't have any stale heartbeats stashed away, which + // could lead to immediately disconnecting nodes when this node is elected coordinator. + purgeHeartbeats(); logger.debug("It appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect."); return; } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java new file mode 100644 index 0000000..bcd3a1f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication; + +import java.io.IOException; +import java.util.Map; + +import javax.ws.rs.core.Response; + +public interface HttpReplicationClient { + + PreparedRequest prepareRequest(String method, Map<String, String> headers, Object entity); + + Response replicate(PreparedRequest request, String uri) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java new file mode 100644 index 0000000..97bda01 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/PreparedRequest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication; + +import java.util.Map; + +public interface PreparedRequest { + String getMethod(); + + Map<String, String> getHeaders(); + + Object getEntity(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index bd1e4b3..93804be 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -17,47 +17,7 @@ package org.apache.nifi.cluster.coordination.http.replication; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.authorization.AccessDeniedException; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.authorization.user.NiFiUserUtils; -import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; -import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; -import org.apache.nifi.cluster.coordination.node.NodeConnectionState; -import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; -import org.apache.nifi.cluster.manager.exception.UnknownNodeException; -import org.apache.nifi.cluster.manager.exception.UriConstructionException; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.remote.protocol.http.HttpHeaders; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.util.ComponentIdGenerator; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.security.ProxiedEntitiesUtils; -import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; -import org.glassfish.jersey.client.ClientProperties; -import org.glassfish.jersey.client.filter.EncodingFilter; -import org.glassfish.jersey.message.GZipEncoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.HttpMethod; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.Entity; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; @@ -66,7 +26,6 @@ import java.util.HashSet; import java.util.List; import java.util.LongSummaryStatistics; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -86,13 +45,40 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; +import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; +import org.apache.nifi.cluster.manager.exception.UnknownNodeException; +import org.apache.nifi.cluster.manager.exception.UriConstructionException; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.ComponentIdGenerator; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.security.ProxiedEntitiesUtils; +import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ThreadPoolRequestReplicator implements RequestReplicator { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); - private final Client client; // the client to use for issuing requests - private final int connectionTimeoutMs; // connection timeout per node request - private final int readTimeoutMs; // read timeout per node request private final int maxConcurrentRequests; // maximum number of concurrent requests private final HttpResponseMapper responseMapper; private final EventReporter eventReporter; @@ -110,22 +96,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - /** - * Creates an instance using a connection timeout and read timeout of 3 seconds - * - * @param corePoolSize core size of the thread pool - * @param maxPoolSize the max number of threads in the thread pool - * @param maxConcurrentRequests maximum number of concurrent requests - * @param client a client for making requests - * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses - * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. - * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. - * @param nifiProperties properties - */ - public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator, - final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { - this(corePoolSize, maxPoolSize, maxConcurrentRequests, client, clusterCoordinator, "5 sec", "5 sec", callback, eventReporter, nifiProperties); - } + private HttpReplicationClient httpClient; + /** * Creates an instance. @@ -135,15 +107,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param maxConcurrentRequests maximum number of concurrent requests * @param client a client for making requests * @param clusterCoordinator the cluster coordinator to use for interacting with node statuses - * @param connectionTimeout the connection timeout specified in milliseconds - * @param readTimeout the read timeout specified in milliseconds * @param callback a callback that will be called whenever all of the responses have been gathered for a request. May be null. * @param eventReporter an EventReporter that can be used to notify users of interesting events. May be null. * @param nifiProperties properties */ - public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final Client client, final ClusterCoordinator clusterCoordinator, - final String connectionTimeout, final String readTimeout, final RequestCompletionCallback callback, - final EventReporter eventReporter, final NiFiProperties nifiProperties) { + public ThreadPoolRequestReplicator(final int corePoolSize, final int maxPoolSize, final int maxConcurrentRequests, final HttpReplicationClient client, + final ClusterCoordinator clusterCoordinator, final RequestCompletionCallback callback, final EventReporter eventReporter, final NiFiProperties nifiProperties) { if (corePoolSize <= 0) { throw new IllegalArgumentException("The Core Pool Size must be greater than zero."); } else if (maxPoolSize < corePoolSize) { @@ -152,19 +121,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalArgumentException("Client may not be null."); } - this.client = client; this.clusterCoordinator = clusterCoordinator; - this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); - this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); this.maxConcurrentRequests = maxConcurrentRequests; this.responseMapper = new StandardHttpResponseMapper(nifiProperties); this.eventReporter = eventReporter; this.callback = callback; this.nifiProperties = nifiProperties; - - client.property(ClientProperties.CONNECT_TIMEOUT, connectionTimeoutMs); - client.property(ClientProperties.READ_TIMEOUT, readTimeoutMs); - client.property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE); + this.httpClient = client; final AtomicInteger threadId = new AtomicInteger(0); final ThreadFactory threadFactory = r -> { @@ -480,8 +443,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } // replicate the request to all nodes + final PreparedRequest request = httpClient.prepareRequest(method, updatedHeaders, entity); final Function<NodeIdentifier, NodeHttpRequest> requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback, finalResponse); + nodeId -> new NodeHttpRequest(request, nodeId, createURI(uri, nodeId), nodeCompletionCallback, finalResponse); + submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); return response; @@ -557,8 +522,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { public void run() { logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath()); + final PreparedRequest request = httpClient.prepareRequest(method, cancelLockHeaders, entity); final Function<NodeIdentifier, NodeHttpRequest> requestFactory = - nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null, clusterResponse); + nodeId -> new NodeHttpRequest(request, nodeId, createURI(uri, nodeId), null, clusterResponse); submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders); } @@ -632,8 +598,9 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { }; // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work - final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback, - clusterResponse); + final PreparedRequest request = httpClient.prepareRequest(method, validationHeaders, entity); + final Function<NodeIdentifier, NodeHttpRequest> requestFactory = + nodeId -> new NodeHttpRequest(request, nodeId, createURI(uri, nodeId), completionCallback, clusterResponse); // replicate the 'verification request' to all nodes submitAsyncRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders); @@ -651,18 +618,19 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } // Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests - protected NodeResponse replicateRequest(final Invocation invocation, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, - final Map<String, String> headers, final StandardAsyncClusterResponse clusterResponse) { + protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId, final URI uri, final String requestId, + final StandardAsyncClusterResponse clusterResponse) throws IOException { + final Response response; final long startNanos = System.nanoTime(); - logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", method, uri, requestId, headers); + logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", request.getMethod(), uri, requestId, request.getHeaders()); // invoke the request - response = invocation.invoke(); + response = httpClient.replicate(request, uri.toString()); final long nanos = System.nanoTime() - startNanos; clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos); - final NodeResponse nodeResponse = new NodeResponse(nodeId, method, uri, response, System.nanoTime() - startNanos, requestId); + final NodeResponse nodeResponse = new NodeResponse(nodeId, request.getMethod(), uri, response, System.nanoTime() - startNanos, requestId); if (nodeResponse.is2xx()) { final int length = nodeResponse.getClientResponse().getLength(); if (length > 0) { @@ -822,20 +790,17 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final NodeIdentifier nodeId; private final String method; private final URI uri; - private final Object entity; - private final Map<String, String> headers = new HashMap<>(); private final NodeRequestCompletionCallback callback; private final StandardAsyncClusterResponse clusterResponse; private final long creationNanos = System.nanoTime(); - private final GZipEncoder gzipEncoder = new GZipEncoder(); + private final PreparedRequest request; - private NodeHttpRequest(final NodeIdentifier nodeId, final String method, final URI uri, final Object entity, final Map<String, String> headers, - final NodeRequestCompletionCallback callback, final StandardAsyncClusterResponse clusterResponse) { + private NodeHttpRequest(final PreparedRequest request, final NodeIdentifier nodeId, final URI uri, + final NodeRequestCompletionCallback callback, final StandardAsyncClusterResponse clusterResponse) { + this.request = request; this.nodeId = nodeId; - this.method = method; + this.method = request.getMethod(); this.uri = uri; - this.entity = entity; - this.headers.putAll(headers); this.callback = callback; this.clusterResponse = clusterResponse; } @@ -849,30 +814,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { NodeResponse nodeResponse; try { - final String rawAcceptEncoding = headers.get(HttpHeaders.ACCEPT_ENCODING); - - final boolean useGzip; - if (rawAcceptEncoding == null) { - useGzip = false; - } else { - final String[] acceptEncodingTokens = rawAcceptEncoding.split(","); - final Set<String> acceptEncoding = Stream.of(acceptEncodingTokens) - .map(String::trim) - .filter(enc -> StringUtils.isNotEmpty(enc)) - .map(String::toLowerCase) - .collect(Collectors.toSet()); - - final Set<String> supportedEncodings = gzipEncoder.getSupportedEncodings(); - useGzip = supportedEncodings.stream() - .anyMatch(supportedEncoding -> acceptEncoding.contains(supportedEncoding.toLowerCase())); - } - // create and send the request - final Invocation invocation = createInvocation(useGzip); - final String requestId = headers.get("x-nifi-request-id"); - + final String requestId = request.getHeaders().get("x-nifi-request-id"); logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId); - nodeResponse = replicateRequest(invocation, nodeId, method, uri, requestId, headers, clusterResponse); + + nodeResponse = replicateRequest(request, nodeId, uri, requestId, clusterResponse); } catch (final Exception e) { nodeResponse = new NodeResponse(nodeId, method, uri, e); logger.warn("Failed to replicate request {} {} to {} due to {}", method, uri.getPath(), nodeId, e.toString()); @@ -884,66 +830,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { callback.onCompletion(nodeResponse); } } - - private Invocation createInvocation(final boolean useGzip) { - // convert parameters to a more convenient data structure - final MultivaluedHashMap<String, String> map = new MultivaluedHashMap(); - - if (entity instanceof MultivaluedMap) { - map.putAll((Map) entity); - } - - // create the resource - WebTarget webTarget = client.target(uri); - - if (useGzip) { - webTarget = webTarget.register(EncodingFilter.class).register(gzipEncoder); - } - - final Invocation invocation; - - // set the parameters as either query parameters or as request body - if (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) { - for (final Entry<String, List<String>> queryEntry : map.entrySet()) { - webTarget = webTarget.queryParam(queryEntry.getKey(), queryEntry.getValue().toArray()); - } - - Invocation.Builder builder = webTarget.request(); - for (final Map.Entry<String, String> entry : headers.entrySet()) { - builder = builder.header(entry.getKey(), entry.getValue()); - } - - invocation = builder.build(method); - } else { - Invocation.Builder builder = webTarget.request(); - - // detect the content type - String contentType = null; - for (final Map.Entry<String, String> entry : headers.entrySet()) { - builder.header(entry.getKey(), entry.getValue()); - - // record the content type - if (entry.getKey().equalsIgnoreCase("content-type")) { - contentType = entry.getValue(); - } - - // never break - } - - // set default content type - if (contentType == null) { - contentType = MediaType.APPLICATION_FORM_URLENCODED; - } - - if (entity == null) { - invocation = builder.build(method, Entity.entity(map, contentType)); - } else { - invocation = builder.build(method, Entity.entity(entity, contentType)); - } - } - - return invocation; - } } private static interface NodeRequestCompletionCallback { http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java new file mode 100644 index 0000000..19028e3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.replication.okhttp; + +import java.io.IOException; +import java.io.OutputStream; + +public interface EntitySerializer { + void serialize(Object entity, OutputStream out) throws IOException; +}