http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 640fcf7..b8b3e74 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -47,7 +47,8 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; -import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; @@ -90,9 +91,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private final NiFiProperties nifiProperties; private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>(); + private final ValidationTrigger validationTrigger; public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo, - final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) { + final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties, final ValidationTrigger validationTrigger) { this.flowController = flowController; this.processScheduler = scheduler; @@ -100,6 +102,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi this.stateManagerProvider = stateManagerProvider; this.variableRegistry = variableRegistry; this.nifiProperties = nifiProperties; + this.validationTrigger = validationTrigger; } private StateManager getStateManager(final String componentId) { @@ -160,7 +163,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler, - id, validationContextFactory, this, componentVarRegistry, flowController); + id, validationContextFactory, this, componentVarRegistry, flowController, validationTrigger); serviceNode.setName(rawClass.getSimpleName()); invocationHandler.setServiceNode(serviceNode); @@ -239,20 +242,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id, - new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController, true); + new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController, validationTrigger, true); serviceCache.putIfAbsent(id, serviceNode); return serviceNode; } @Override - public Set<ConfiguredComponent> disableReferencingServices(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> disableReferencingServices(final ControllerServiceNode serviceNode) { // Get a list of all Controller Services that need to be disabled, in the order that they need to be disabled. final List<ControllerServiceNode> toDisable = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable); - final Set<ConfiguredComponent> updated = new HashSet<>(); + final Set<ComponentNode> updated = new HashSet<>(); for (final ControllerServiceNode nodeToDisable : toDisable) { if (nodeToDisable.isActive()) { nodeToDisable.verifyCanDisable(serviceSet); @@ -266,13 +269,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override - public Set<ConfiguredComponent> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> scheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class); - final Set<ConfiguredComponent> updated = new HashSet<>(); + final Set<ComponentNode> updated = new HashSet<>(); // verify that we can start all components (that are not disabled) before doing anything for (final ProcessorNode node : processors) { @@ -306,13 +309,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } @Override - public Set<ConfiguredComponent> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. final List<ProcessorNode> processors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class); final List<ReportingTaskNode> reportingTasks = serviceNode.getReferences().findRecursiveReferences(ReportingTaskNode.class); - final Set<ConfiguredComponent> updated = new HashSet<>(); + final Set<ComponentNode> updated = new HashSet<>(); // verify that we can stop all components (that are running) before doing anything for (final ProcessorNode node : processors) { @@ -720,18 +723,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override - public Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode) { + public Set<ComponentNode> enableReferencingServices(final ControllerServiceNode serviceNode) { final List<ControllerServiceNode> recursiveReferences = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class); logger.debug("Enabling the following Referencing Services for {}: {}", serviceNode, recursiveReferences); return enableReferencingServices(serviceNode, recursiveReferences); } - private Set<ConfiguredComponent> enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) { + private Set<ComponentNode> enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) { if (!serviceNode.isActive()) { serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences)); } - final Set<ConfiguredComponent> updated = new HashSet<>(); + final Set<ComponentNode> updated = new HashSet<>(); final Set<ControllerServiceNode> ifEnabled = new HashSet<>(); for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java index 285b8dc..4cb5239 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java @@ -22,17 +22,17 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; public class StandardControllerServiceReference implements ControllerServiceReference { private final ControllerServiceNode referenced; - private final Set<ConfiguredComponent> components; + private final Set<ComponentNode> components; public StandardControllerServiceReference(final ControllerServiceNode referencedService, - final Set<ConfiguredComponent> referencingComponents) { + final Set<ComponentNode> referencingComponents) { this.referenced = referencedService; this.components = new HashSet<>(referencingComponents); } @@ -43,11 +43,11 @@ public class StandardControllerServiceReference implements ControllerServiceRefe } @Override - public Set<ConfiguredComponent> getReferencingComponents() { + public Set<ComponentNode> getReferencingComponents() { return Collections.unmodifiableSet(components); } - private boolean isRunning(final ConfiguredComponent component) { + private boolean isRunning(final ComponentNode component) { if (component instanceof ReportingTaskNode) { return ((ReportingTaskNode) component).isRunning(); } @@ -60,11 +60,11 @@ public class StandardControllerServiceReference implements ControllerServiceRefe } @Override - public Set<ConfiguredComponent> getActiveReferences() { - final Set<ConfiguredComponent> activeReferences = new HashSet<>(); + public Set<ComponentNode> getActiveReferences() { + final Set<ComponentNode> activeReferences = new HashSet<>(); final Set<ControllerServiceNode> serviceNodes = new HashSet<>(); - for (final ConfiguredComponent component : components) { + for (final ComponentNode component : components) { if (component instanceof ControllerServiceNode) { serviceNodes.add((ControllerServiceNode) component); @@ -80,17 +80,17 @@ public class StandardControllerServiceReference implements ControllerServiceRefe return activeReferences; } - private Set<ConfiguredComponent> getActiveIndirectReferences(final Set<ControllerServiceNode> referencingServices) { + private Set<ComponentNode> getActiveIndirectReferences(final Set<ControllerServiceNode> referencingServices) { if (referencingServices.isEmpty()) { return Collections.emptySet(); } - final Set<ConfiguredComponent> references = new HashSet<>(); + final Set<ComponentNode> references = new HashSet<>(); for (final ControllerServiceNode referencingService : referencingServices) { final Set<ControllerServiceNode> serviceNodes = new HashSet<>(); final ControllerServiceReference ref = referencingService.getReferences(); - for (final ConfiguredComponent component : ref.getReferencingComponents()) { + for (final ComponentNode component : ref.getReferencingComponents()) { if (component instanceof ControllerServiceNode) { serviceNodes.add((ControllerServiceNode) component); } else if (isRunning(component)) { @@ -113,7 +113,7 @@ public class StandardControllerServiceReference implements ControllerServiceRefe private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) { final List<T> references = new ArrayList<>(); - for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) { + for (final ComponentNode referencingComponent : referencedNode.getReferences().getReferencingComponents()) { if (componentType.isAssignableFrom(referencingComponent.getClass())) { references.add(componentType.cast(referencingComponent)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 7357756..96bd65e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -16,6 +16,31 @@ */ package org.apache.nifi.groups; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -32,6 +57,7 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -41,8 +67,8 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Positionable; import org.apache.nifi.connectable.Size; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; @@ -119,31 +145,6 @@ import org.apache.nifi.web.api.dto.TemplateDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public final class StandardProcessGroup implements ProcessGroup { private final String id; @@ -297,7 +298,7 @@ public final class StandardProcessGroup implements ProcessGroup { disabled++; } else if (procNode.isRunning()) { running++; - } else if (!procNode.isValid()) { + } else if (procNode.getValidationStatus() == ValidationStatus.INVALID) { invalid++; } else { stopped++; @@ -543,6 +544,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group"); } + scheduler.onPortRemoved(port); onComponentModified(); flowController.onInputPortRemoved(port); @@ -618,6 +620,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group"); } + scheduler.onPortRemoved(port); onComponentModified(); flowController.onOutputPortRemoved(port); @@ -812,6 +815,9 @@ public final class StandardProcessGroup implements ProcessGroup { LOG.warn("Failed to clean up resources for {} due to {}", remoteGroup, e); } + remoteGroup.getInputPorts().stream().forEach(scheduler::onPortRemoved); + remoteGroup.getOutputPorts().stream().forEach(scheduler::onPortRemoved); + remoteGroups.remove(remoteGroupId); LOG.info("{} removed from flow", remoteProcessGroup); } finally { @@ -847,7 +853,7 @@ public final class StandardProcessGroup implements ProcessGroup { * * @param component the component whose invalid references should be removed */ - private void updateControllerServiceReferences(final ConfiguredComponent component) { + private void updateControllerServiceReferences(final ComponentNode component) { for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) { final String serviceId = entry.getValue(); if (serviceId == null) { @@ -917,6 +923,8 @@ public final class StandardProcessGroup implements ProcessGroup { scheduler.onProcessorRemoved(processor); flowController.onProcessorRemoved(processor); + LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); + final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider(); scheduler.submitFrameworkTask(new Runnable() { @Override @@ -946,10 +954,10 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public Set<ProcessorNode> getProcessors() { + public Collection<ProcessorNode> getProcessors() { readLock.lock(); try { - return new LinkedHashSet<>(processors.values()); + return processors.values(); } finally { readLock.unlock(); } @@ -2163,7 +2171,7 @@ public final class StandardProcessGroup implements ProcessGroup { // and notify the Process Group that a component has been modified. This way, we know to re-calculate // whether or not the Process Group has local modifications. service.getReferences().getReferencingComponents().stream() - .map(ConfiguredComponent::getProcessGroupIdentifier) + .map(ComponentNode::getProcessGroupIdentifier) .filter(id -> !id.equals(getIdentifier())) .forEach(groupId -> { final ProcessGroup descendant = findProcessGroup(groupId); @@ -2935,8 +2943,8 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public Set<ConfiguredComponent> getComponentsAffectedByVariable(final String variableName) { - final Set<ConfiguredComponent> affected = new HashSet<>(); + public Set<ComponentNode> getComponentsAffectedByVariable(final String variableName) { + final Set<ComponentNode> affected = new HashSet<>(); // Determine any Processors that references the variable for (final ProcessorNode processor : getProcessors()) { @@ -2956,7 +2964,7 @@ public final class StandardProcessGroup implements ProcessGroup { affected.add(service); final ControllerServiceReference reference = service.getReferences(); - affected.addAll(reference.findRecursiveReferences(ConfiguredComponent.class)); + affected.addAll(reference.findRecursiveReferences(ComponentNode.class)); } } } @@ -2994,7 +3002,7 @@ public final class StandardProcessGroup implements ProcessGroup { return updatedVariableNames; } - private List<VariableImpact> getVariableImpact(final ConfiguredComponent component) { + private List<VariableImpact> getVariableImpact(final ComponentNode component) { return component.getProperties().keySet().stream() .map(descriptor -> { final String configuredVal = component.getProperty(descriptor); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java index d79e9e7..b19173b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateDeserializer.java @@ -31,16 +31,25 @@ import java.io.InputStream; public class TemplateDeserializer { + private static final JAXBContext jaxbContext; + + static { + try { + jaxbContext = JAXBContext.newInstance(TemplateDTO.class); + } catch (final JAXBException e) { + throw new RuntimeException("Cannot create JAXBContext for serializing templates", e); + } + } + public static TemplateDTO deserialize(final InputStream inStream) { return deserialize(new StreamSource(inStream)); } public static TemplateDTO deserialize(final StreamSource source) { try { - JAXBContext context = JAXBContext.newInstance(TemplateDTO.class); - XMLStreamReader xsr = XmlUtils.createSafeReader(source); - Unmarshaller unmarshaller = context.createUnmarshaller(); - JAXBElement<TemplateDTO> templateElement = unmarshaller.unmarshal(xsr, TemplateDTO.class); + final XMLStreamReader xsr = XmlUtils.createSafeReader(source); + final Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); + final JAXBElement<TemplateDTO> templateElement = unmarshaller.unmarshal(xsr, TemplateDTO.class); return templateElement.getValue(); } catch (final JAXBException | XMLStreamException e) { throw new FlowSerializationException(e); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java index 8de1316..cb311f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/TemplateSerializer.java @@ -28,6 +28,16 @@ import java.io.IOException; public final class TemplateSerializer { + private static final JAXBContext jaxbContext; + + static { + try { + jaxbContext = JAXBContext.newInstance(TemplateDTO.class); + } catch (final JAXBException e) { + throw new RuntimeException("Cannot create JAXBContext for serializing templates", e); + } + } + /** * This method when called assumes the Framework Nar ClassLoader is in the * classloader hierarchy of the current context class loader. @@ -39,8 +49,7 @@ public final class TemplateSerializer { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final BufferedOutputStream bos = new BufferedOutputStream(baos); - JAXBContext context = JAXBContext.newInstance(TemplateDTO.class); - Marshaller marshaller = context.createMarshaller(); + final Marshaller marshaller = jaxbContext.createMarshaller(); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); marshaller.marshal(dto, bos); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 1f5cfee..ae881dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -57,8 +57,12 @@ public class StandardSchedulingContext implements SchedulingContext { throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService().getIdentifier() + " is not currently enabled"); } - if (!serviceNode.isValid()) { - throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService().getIdentifier() + " is not currently valid"); + switch (serviceNode.getValidationStatus()) { + case INVALID: + throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService().getIdentifier() + " is not currently valid"); + case VALIDATING: + throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + + serviceNode.getProxiedControllerService().getIdentifier() + " is in the process of validating its configuration"); } serviceNode.addReference(processorNode); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index 2f38aee..58bd5cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -36,6 +36,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.expression.ExpressionLanguageCompiler; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.registry.VariableRegistry; @@ -47,7 +48,6 @@ public class StandardValidationContext implements ValidationContext { private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; private final Map<String, Boolean> expressionLanguageSupported; private final String annotationData; - private final Set<String> serviceIdentifiersToNotValidate; private final VariableRegistry variableRegistry; private final String groupId; private final String componentId; @@ -68,7 +68,6 @@ public class StandardValidationContext implements ValidationContext { this.controllerServiceProvider = controllerServiceProvider; this.properties = new HashMap<>(properties); this.annotationData = annotationData; - this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate; this.variableRegistry = variableRegistry; this.groupId = groupId; this.componentId = componentId; @@ -141,7 +140,13 @@ public class StandardValidationContext implements ValidationContext { @Override public boolean isValidationRequired(final ControllerService service) { - return !serviceIdentifiersToNotValidate.contains(service.getIdentifier()); + // No need to validate services that are already enabled. + final ControllerServiceState serviceState = controllerServiceProvider.getControllerServiceNode(service.getIdentifier()).getState(); + if (serviceState == ControllerServiceState.ENABLED || serviceState == ControllerServiceState.ENABLING) { + return false; + } + + return true; } @Override @@ -164,4 +169,9 @@ public class StandardValidationContext implements ValidationContext { public String getProcessGroupIdentifier() { return groupId; } + + @Override + public String toString() { + return "StandardValidationContext[componentId=" + componentId + ", properties=" + properties + "]"; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index bdd328c..8d6e5e3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -37,7 +37,7 @@ import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; -import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.label.Label; @@ -315,7 +315,7 @@ public class NiFiRegistryFlowMapper { return versionedService; } - private Map<String, String> mapProperties(final ConfiguredComponent component, final ControllerServiceProvider serviceProvider) { + private Map<String, String> mapProperties(final ComponentNode component, final ControllerServiceProvider serviceProvider) { final Map<String, String> mapped = new HashMap<>(); component.getProperties().keySet().stream() @@ -341,7 +341,7 @@ public class NiFiRegistryFlowMapper { return mapped; } - private Map<String, VersionedPropertyDescriptor> mapPropertyDescriptors(final ConfiguredComponent component) { + private Map<String, VersionedPropertyDescriptor> mapPropertyDescriptors(final ComponentNode component) { final Map<String, VersionedPropertyDescriptor> descriptors = new HashMap<>(); for (final PropertyDescriptor descriptor : component.getProperties().keySet()) { final VersionedPropertyDescriptor versionedDescriptor = new VersionedPropertyDescriptor(); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ClassAnnotationPair.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ClassAnnotationPair.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ClassAnnotationPair.java new file mode 100644 index 0000000..44ffa70 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ClassAnnotationPair.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.util; + +import java.lang.annotation.Annotation; +import java.util.Arrays; + +public class ClassAnnotationPair { + private final Class<?> clazz; + private final Class<? extends Annotation>[] annotations; + + public ClassAnnotationPair(final Class<?> clazz, final Class<? extends Annotation>[] annotations) { + this.clazz = clazz; + this.annotations = annotations; + } + + public Class<?> getDeclaredClass() { + return clazz; + } + + public Class<? extends Annotation>[] getAnnotations() { + return annotations; + } + + @Override + public int hashCode() { + return 41 + 47 * clazz.hashCode() + 47 * Arrays.hashCode(annotations); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + + if (!(obj instanceof ClassAnnotationPair)) { + return false; + } + + final ClassAnnotationPair other = (ClassAnnotationPair) obj; + return clazz == other.clazz && Arrays.equals(annotations, other.annotations); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ReflectionUtils.java index 08feff6..5666f9c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ReflectionUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ReflectionUtils.java @@ -19,7 +19,11 @@ package org.apache.nifi.util; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.nifi.logging.ComponentLog; import org.slf4j.Logger; @@ -29,6 +33,7 @@ import org.springframework.core.annotation.AnnotationUtils; public class ReflectionUtils { private final static Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class); + private static ConcurrentMap<ClassAnnotationPair, List<Method>> annotationCache = new ConcurrentHashMap<>(); /** * Invokes all methods on the given instance that have been annotated with the given Annotation. If the signature of the method that is defined in <code>instance</code> uses 1 or more parameters, @@ -114,42 +119,71 @@ public class ReflectionUtils { * @return <code>true</code> if all appropriate methods were invoked and returned without throwing an Exception, <code>false</code> if one of the methods threw an Exception or could not be * invoked; if <code>false</code> is returned, an error will have been logged. */ - public static boolean quietlyInvokeMethodsWithAnnotations(final Class<? extends Annotation> preferredAnnotation, - final Class<? extends Annotation> alternateAnnotation, final Object instance, final Object... args) { + public static boolean quietlyInvokeMethodsWithAnnotations(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, + final Object instance, final Object... args) { return quietlyInvokeMethodsWithAnnotations(preferredAnnotation, alternateAnnotation, instance, null, args); } - private static boolean invokeMethodsWithAnnotations(boolean quietly, ComponentLog logger, Object instance, - Class<? extends Annotation>[] annotations, Object... args) - throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + private static boolean invokeMethodsWithAnnotations(boolean quietly, ComponentLog logger, Object instance, Class<? extends Annotation>[] annotations, Object... args) + throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + return invokeMethodsWithAnnotations(quietly, logger, instance, instance.getClass(), annotations, args); } - private static boolean invokeMethodsWithAnnotations(boolean quietly, ComponentLog logger, Object instance, - Class<?> clazz, Class<? extends Annotation>[] annotations, Object... args) - throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + private static boolean invokeMethodsWithAnnotations(boolean quietly, ComponentLog logger, Object instance, Class<?> clazz, Class<? extends Annotation>[] annotations, Object... args) + throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + boolean isSuccess = true; - for (Method method : clazz.getMethods()) { - if (isAnyAnnotationPresent(method, annotations)) { - Object[] modifiedArgs = buildUpdatedArgumentsList(quietly, method, annotations, logger, args); - if (modifiedArgs != null) { - try { - method.invoke(instance, modifiedArgs); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - isSuccess = false; - if (quietly) { - logErrorMessage("Failed while invoking annotated method '" + method + "' with arguments '" - + Arrays.asList(modifiedArgs) + "'.", logger, e); - } else { - throw e; - } + final List<Method> methods = findMethodsWithAnnotations(clazz, annotations); + for (final Method method : methods) { + Object[] modifiedArgs = buildUpdatedArgumentsList(quietly, method, annotations, logger, args); + if (modifiedArgs != null) { + try { + method.invoke(instance, modifiedArgs); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + isSuccess = false; + if (quietly) { + logErrorMessage("Failed while invoking annotated method '" + method + "' with arguments '" + + Arrays.asList(modifiedArgs) + "'.", logger, e); + } else { + throw e; } } } } + return isSuccess; } + private static List<Method> findMethodsWithAnnotations(final Class<?> clazz, final Class<? extends Annotation>[] annotations) { + // We use a cache here to store a mapping of Class & Annotation[] to those methods that contain the annotation. + // This is done because discovering this using Reflection is fairly expensive (can take up to tens of milliseconds on laptop). + // While this may not seem like much time, consider deleting a Process Group with thousands of Processors or instantiating + // a Template with thousands of Processors. This can add up to several seconds very easily. + final ClassAnnotationPair pair = new ClassAnnotationPair(clazz, annotations); + List<Method> methods = annotationCache.get(pair); + if (methods != null) { + return methods; + } + + methods = discoverMethodsWithAnnotations(clazz, annotations); + annotationCache.putIfAbsent(pair, methods); + return methods; + } + + private static List<Method> discoverMethodsWithAnnotations(final Class<?> clazz, final Class<? extends Annotation>[] annotations) { + final List<Method> methods = new ArrayList<>(); + + for (Method method : clazz.getMethods()) { + if (isAnyAnnotationPresent(method, annotations)) { + methods.add(method); + } + } + + return methods; + } + + private static boolean isAnyAnnotationPresent(Method method, Class<? extends Annotation>[] annotations) { for (Class<? extends Annotation> annotation : annotations) { if (AnnotationUtils.findAnnotation(method, annotation) != null) { @@ -170,6 +204,7 @@ public class ReflectionUtils { } else { logErrorMessage("Can not invoke method '" + method + "' with provided arguments since argument " + i + " of type '" + paramTypes[i] + "' is not assignable from provided value of type '" + args[i].getClass() + "'.", processLogger, null); + if (quietly){ parametersCompatible = false; } else { http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 392e92b..4750746 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -22,7 +22,6 @@ import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.serialization.FlowSerializationException; -import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.ScheduledStateLookup; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; @@ -45,6 +44,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.w3c.dom.Document; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -96,9 +96,10 @@ public class StandardFlowServiceTest { byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml")); flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>())); - FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); + StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); + final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP); + serializer.serialize(doc, baos); String expectedFlow = new String(flowBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); @@ -120,9 +121,10 @@ public class StandardFlowServiceTest { flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-inheritable.xml")); flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>())); - FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); + StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); + final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP); + serializer.serialize(doc, baos); String expectedFlow = new String(flowBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); @@ -140,9 +142,10 @@ public class StandardFlowServiceTest { fail("should have thrown " + UninheritableFlowException.class); } catch (UninheritableFlowException ufe) { - FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); + StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); + final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP); + serializer.serialize(doc, baos); String expectedFlow = new String(originalBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); @@ -162,9 +165,10 @@ public class StandardFlowServiceTest { fail("should have thrown " + FlowSerializationException.class); } catch (FlowSerializationException ufe) { - FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); + StandardFlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); + final Document doc = serializer.transform(flowController, ScheduledStateLookup.IDENTITY_LOOKUP); + serializer.serialize(doc, baos); String expectedFlow = new String(originalBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 5838871..fd3f83b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -72,9 +72,11 @@ import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; +import java.net.URLClassLoader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -105,7 +107,7 @@ public class TestFlowController { private Bundle systemBundle; private BulletinRepository bulletinRepo; private VariableRegistry variableRegistry; - private volatile String propsFile = TestFlowController.class.getResource("/flowcontrollertest.nifi.properties").getFile(); + private volatile String propsFile = "src/test/resources/flowcontrollertest.nifi.properties"; @Before public void setup() { @@ -208,7 +210,7 @@ public class TestFlowController { assertEquals(rootGroupCs.getProperties(), controllerCs.getProperties()); // should be one processor - final Set<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); + final Collection<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); assertNotNull(processorNodes); assertEquals(1, processorNodes.size()); @@ -261,7 +263,7 @@ public class TestFlowController { assertNotNull(rootGroupCs); // should be one processor - final Set<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); + final Collection<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); assertNotNull(processorNodes); assertEquals(1, processorNodes.size()); @@ -652,15 +654,15 @@ public class TestFlowController { final String id = "ServiceA" + System.currentTimeMillis(); final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, null, true); - final String originalName = controllerServiceNode.getName(); // the instance class loader shouldn't have any of the resources yet - InstanceClassLoader instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + URLClassLoader instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); assertNotNull(instanceClassLoader); assertFalse(containsResource(instanceClassLoader.getURLs(), resource1)); assertFalse(containsResource(instanceClassLoader.getURLs(), resource2)); assertFalse(containsResource(instanceClassLoader.getURLs(), resource3)); - assertTrue(instanceClassLoader.getAdditionalResourceUrls().isEmpty()); + assertTrue(instanceClassLoader instanceof InstanceClassLoader); + assertTrue(((InstanceClassLoader) instanceClassLoader).getAdditionalResourceUrls().isEmpty()); controller.reload(controllerServiceNode, ServiceB.class.getName(), coordinate, additionalUrls); @@ -670,7 +672,8 @@ public class TestFlowController { assertTrue(containsResource(instanceClassLoader.getURLs(), resource1)); assertTrue(containsResource(instanceClassLoader.getURLs(), resource2)); assertTrue(containsResource(instanceClassLoader.getURLs(), resource3)); - assertEquals(3, instanceClassLoader.getAdditionalResourceUrls().size()); + assertTrue(instanceClassLoader instanceof InstanceClassLoader); + assertEquals(3, ((InstanceClassLoader) instanceClassLoader).getAdditionalResourceUrls().size()); } @Test http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index 552db3d..eabc899 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -71,6 +71,7 @@ import org.apache.nifi.test.processors.ModifiesClasspathProcessor; import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.MockVariableRegistry; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.SynchronousValidationTrigger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -98,7 +99,7 @@ public class TestStandardProcessorNode { final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, coordinate, null); final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null, - NiFiProperties.createBasicNiFiProperties(null, null), new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + NiFiProperties.createBasicNiFiProperties(null, null), new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, new SynchronousValidationTrigger()); final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true); final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false); @@ -118,6 +119,7 @@ public class TestStandardProcessorNode { } }; + procNode.performValidation(); procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback, true); Thread.sleep(1000L); @@ -126,22 +128,6 @@ public class TestStandardProcessorNode { assertEquals(1, processor.onStoppedCount); } - @Test - public void testDisabledValidationErrors() { - final MockReloadComponent reloadComponent = new MockReloadComponent(); - final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor(); - final StandardProcessorNode procNode = createProcessorNode(processor, reloadComponent); - - // Set a property to an invalid value - final Map<String, String> properties = new HashMap<>(); - properties.put(ModifiesClasspathNoAnnotationProcessor.CLASSPATH_RESOURCE.getName(), ""); - procNode.setProperties(properties); - Assert.assertTrue(procNode.getValidationErrors().size() > 0); - - // Disabled processors skip property validation - procNode.disable(); - Assert.assertFalse(procNode.getValidationErrors().size() > 0); - } @Test public void testSinglePropertyDynamicallyModifiesClasspath() throws MalformedURLException { @@ -175,7 +161,7 @@ public class TestStandardProcessorNode { assertEquals(ModifiesClasspathProcessor.class.getCanonicalName(), reloadComponent.getNewType()); // Should pass validation - assertTrue(procNode.isValid()); + assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } @@ -214,7 +200,7 @@ public class TestStandardProcessorNode { } // Should pass validation - assertTrue(procNode.isValid()); + assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); // Simulate setting updating the other property which should not change the classpath final Map<String, String> otherProperties = new HashMap<>(); @@ -227,7 +213,7 @@ public class TestStandardProcessorNode { } // Should STILL pass validation - assertTrue(procNode.isValid()); + assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); // Lets update the classpath property and make sure the resources get updated final Map<String, String> newClasspathProperties = new HashMap<>(); @@ -242,7 +228,7 @@ public class TestStandardProcessorNode { assertEquals(ModifiesClasspathProcessor.class.getCanonicalName(), reloadComponent.getNewType()); // Should STILL pass validation - assertTrue(procNode.isValid()); + assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } @@ -287,7 +273,7 @@ public class TestStandardProcessorNode { assertEquals(ModifiesClasspathProcessor.class.getCanonicalName(), reloadComponent.getNewType()); // Should pass validation - assertTrue(procNode.isValid()); + assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } @@ -329,7 +315,7 @@ public class TestStandardProcessorNode { assertEquals(ModifiesClasspathProcessor.class.getCanonicalName(), reloadComponent.getNewType()); // Should pass validation - assertTrue(procNode.isValid()); + assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } @@ -356,8 +342,7 @@ public class TestStandardProcessorNode { assertEquals(ModifiesClasspathNoAnnotationProcessor.class.getCanonicalName(), reloadComponent.getNewType()); // Should pass validation - assertTrue(procNode.isValid()); - + assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } @@ -415,7 +400,7 @@ public class TestStandardProcessorNode { final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), componentLog); return new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, processScheduler, - null, niFiProperties, new StandardComponentVariableRegistry(variableRegistry), reloadComponent); + null, niFiProperties, new StandardComponentVariableRegistry(variableRegistry), reloadComponent, new SynchronousValidationTrigger()); } private static class MockReloadComponent implements ReloadComponent { http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java index 2d7b22f..f8a4b43 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java @@ -35,6 +35,7 @@ import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.SynchronousValidationTrigger; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -67,9 +68,12 @@ public class StandardProcessSchedulerIT { @Test public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, + stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger()); + final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); + final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); ts.setLimit(3000); scheduler.enableControllerService(serviceNode); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index d459f5c..50c60a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -16,6 +16,28 @@ */ package org.apache.nifi.controller.scheduling; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; + import org.apache.commons.io.FileUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -27,7 +49,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; -import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; @@ -56,30 +77,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; -import java.util.function.Supplier; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - /** * Validate Processor's life-cycle operation within the context of * {@link FlowController} and {@link StandardProcessScheduler} @@ -89,7 +86,7 @@ public class TestProcessorLifecycle { private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); private FlowController fc; private Map<String, String> properties = new HashMap<>(); - private volatile String propsFile = TestProcessorLifecycle.class.getResource("/lifecycletest.nifi.properties").getFile(); + private volatile String propsFile = "src/test/resources/lifecycletest.nifi.properties"; @Before public void before() throws Exception { @@ -161,6 +158,7 @@ public class TestProcessorLifecycle { assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); } @@ -185,6 +183,7 @@ public class TestProcessorLifecycle { this.noop(testProcessor); final ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); ps.startProcessor(testProcNode, true); ps.startProcessor(testProcNode, true); @@ -343,6 +342,7 @@ public class TestProcessorLifecycle { this.longRunningOnSchedule(testProcessor, delay); ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L); @@ -377,6 +377,7 @@ public class TestProcessorLifecycle { testProcessor.keepFailingOnScheduledTimes = 2; ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L); ps.stopProcessor(testProcNode); @@ -406,6 +407,7 @@ public class TestProcessorLifecycle { testProcessor.keepFailingOnScheduledTimes = Integer.MAX_VALUE; ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); @@ -431,6 +433,7 @@ public class TestProcessorLifecycle { this.blockingInterruptableOnUnschedule(testProcessor); ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); @@ -456,6 +459,7 @@ public class TestProcessorLifecycle { this.blockingUninterruptableOnUnschedule(testProcessor); ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L); ps.stopProcessor(testProcNode); @@ -483,6 +487,7 @@ public class TestProcessorLifecycle { testProcessor.generateExceptionOnTrigger = true; ProcessScheduler ps = fc.getProcessScheduler(); + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.disableProcessor(testProcNode); @@ -564,83 +569,16 @@ public class TestProcessorLifecycle { this.noop(testProcessor); ProcessScheduler ps = fc.getProcessScheduler(); + testServiceNode.performValidation(); ps.enableControllerService(testServiceNode); + + testProcNode.performValidation(); ps.startProcessor(testProcNode, true); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); } - /** - * Test deletion of processor when connected to another - * - * @throws Exception exception - */ - @Test - public void validateProcessorDeletion() throws Exception { - final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); - fc = fcsb.getFlowController(); - - ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); - this.setControllerRootGroup(fc, testGroup); - - ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), - fcsb.getSystemBundle().getBundleDetails().getCoordinate()); - testProcNodeA.setProperties(properties); - testGroup.addProcessor(testProcNodeA); - - ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), - fcsb.getSystemBundle().getBundleDetails().getCoordinate()); - testProcNodeB.setProperties(properties); - testGroup.addProcessor(testProcNodeB); - - Collection<String> relationNames = new ArrayList<>(); - relationNames.add("relation"); - Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames); - testGroup.addConnection(connection); - - ProcessScheduler ps = fc.getProcessScheduler(); - ps.startProcessor(testProcNodeA, true); - ps.startProcessor(testProcNodeB, true); - - try { - testGroup.removeProcessor(testProcNodeA); - fail(); - } catch (Exception e) { - // should throw exception because processor running - } - - try { - testGroup.removeProcessor(testProcNodeB); - fail(); - } catch (Exception e) { - // should throw exception because processor running - } - - ps.stopProcessor(testProcNodeB); - Thread.sleep(100); - - try { - testGroup.removeProcessor(testProcNodeA); - fail(); - } catch (Exception e) { - // should throw exception because destination processor running - } - - try { - testGroup.removeProcessor(testProcNodeB); - fail(); - } catch (Exception e) { - // should throw exception because source processor running - } - - ps.stopProcessor(testProcNodeA); - Thread.sleep(100); - - testGroup.removeProcessor(testProcNodeA); - testGroup.removeProcessor(testProcNodeB); - testGroup.shutdown(); - } /** * Scenario where onTrigger() is executed with random delay limited to @@ -762,11 +700,12 @@ public class TestProcessorLifecycle { /** */ public static class TestProcessor extends AbstractProcessor { + private static final Runnable NOP = () -> {}; - private Runnable onScheduleCallback; - private Runnable onUnscheduleCallback; - private Runnable onStopCallback; - private Runnable onTriggerCallback; + private Runnable onScheduleCallback = NOP; + private Runnable onUnscheduleCallback = NOP; + private Runnable onStopCallback = NOP; + private Runnable onTriggerCallback = NOP; private boolean generateExceptionOnScheduled; private boolean generateExceptionOnTrigger; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index ea721b8..2f1d0cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -83,6 +83,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.SynchronousValidationTrigger; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -128,7 +129,7 @@ public class TestStandardProcessScheduler { final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); final LoggableComponent<ReportingTask> loggableComponent = new LoggableComponent<>(reportingTask, systemBundle.getBundleDetails().getCoordinate(), logger); taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, - new StandardComponentVariableRegistry(variableRegistry), reloadComponent); + new StandardComponentVariableRegistry(variableRegistry), reloadComponent, new SynchronousValidationTrigger()); controller = Mockito.mock(FlowController.class); @@ -169,6 +170,7 @@ public class TestStandardProcessScheduler { */ @Test public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedException, InitializationException { + taskNode.performValidation(); scheduler.schedule(taskNode); // Let it try to run a few times. @@ -193,8 +195,8 @@ public class TestStandardProcessScheduler { final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); - final StandardControllerServiceProvider serviceProvider - = new StandardControllerServiceProvider(controller, scheduler, null, Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties); + final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null, + Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties, new SynchronousValidationTrigger()); final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", systemBundle.getBundleDetails().getCoordinate(), null, true); rootGroup.addControllerService(service); @@ -202,14 +204,17 @@ public class TestStandardProcessScheduler { final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null); final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, new StandardValidationContextFactory(serviceProvider, variableRegistry), - scheduler, serviceProvider, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + scheduler, serviceProvider, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, new SynchronousValidationTrigger()); rootGroup.addProcessor(procNode); Map<String, String> procProps = new HashMap<>(); procProps.put(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); procNode.setProperties(procProps); + service.performValidation(); scheduler.enableControllerService(service); + + procNode.performValidation(); scheduler.startProcessor(procNode, true); Thread.sleep(25L); @@ -277,9 +282,12 @@ public class TestStandardProcessScheduler { @Test public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception { final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, + stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger()); + final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); + assertFalse(serviceNode.isActive()); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); final ExecutorService executor = Executors.newCachedThreadPool(); @@ -316,7 +324,9 @@ public class TestStandardProcessScheduler { @Test public void validateDisabledServiceCantBeDisabled() throws Exception { final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, + variableRegistry, nifiProperties, new SynchronousValidationTrigger()); + final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); @@ -354,7 +364,8 @@ public class TestStandardProcessScheduler { @Test public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception { final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, + variableRegistry, nifiProperties, new SynchronousValidationTrigger()); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); @@ -388,7 +399,8 @@ public class TestStandardProcessScheduler { @Test public void validateDisablingOfTheFailedService() throws Exception { final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, + stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger()); final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); scheduler.enableControllerService(serviceNode); @@ -420,7 +432,8 @@ public class TestStandardProcessScheduler { @Ignore public void validateEnabledDisableMultiThread() throws Exception { final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, + variableRegistry, nifiProperties, new SynchronousValidationTrigger()); final ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 200; i++) { final ControllerServiceNode serviceNode = provider.createControllerService(RandomShortDelayEnablingService.class.getName(), "1", @@ -463,7 +476,8 @@ public class TestStandardProcessScheduler { @Test public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception { final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, + stateMgrProvider, variableRegistry, nifiProperties, new SynchronousValidationTrigger()); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), null, false); final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); @@ -492,8 +506,9 @@ public class TestStandardProcessScheduler { final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(), new StandardValidationContextFactory(controller, variableRegistry), - scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, new SynchronousValidationTrigger()); + procNode.performValidation(); rootGroup.addProcessor(procNode); scheduler.startProcessor(procNode, true); @@ -517,10 +532,11 @@ public class TestStandardProcessScheduler { final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(), new StandardValidationContextFactory(controller, variableRegistry), - scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, new SynchronousValidationTrigger()); rootGroup.addProcessor(procNode); + procNode.performValidation(); scheduler.startProcessor(procNode, true); while (!proc.isSucceess()) { Thread.sleep(5L); @@ -545,10 +561,11 @@ public class TestStandardProcessScheduler { final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(), new StandardValidationContextFactory(controller, variableRegistry), - scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, new SynchronousValidationTrigger()); rootGroup.addProcessor(procNode); + procNode.performValidation(); scheduler.startProcessor(procNode, true); Thread.sleep(100L);