http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index fca31d5..afdb3e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -66,6 +66,8 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationState; +import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -93,8 +95,8 @@ import org.apache.nifi.util.CharacterFilterUtils; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; -import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.apache.nifi.util.ThreadUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,7 +124,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final Map<Connection, Connectable> destinations; private final Map<Relationship, Set<Connection>> connections; private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate; - private final AtomicReference<List<Connection>> incomingConnectionsRef; + private final AtomicReference<List<Connection>> incomingConnections; private final AtomicBoolean lossTolerant; private final AtomicReference<String> comments; private final AtomicReference<Position> position; @@ -144,23 +146,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private ExecutionNode executionNode; private final long onScheduleTimeoutMillis; private final Map<Thread, ActiveTask> activeThreads = new HashMap<>(48); + private final int hashCode; + private volatile boolean hasActiveThreads = false; public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties, - final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) { + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger) { - this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, - processor.getComponent().getClass().getSimpleName(), processor.getComponent().getClass().getCanonicalName(), nifiProperties, variableRegistry, reloadComponent, false); + this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider, processor.getComponent().getClass().getSimpleName(), + processor.getComponent().getClass().getCanonicalName(), nifiProperties, variableRegistry, reloadComponent, validationTrigger, false); } public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider, final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties, - final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, + final boolean isExtensionMissing) { - super(uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing); + super(uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, validationTrigger, isExtensionMissing); final ProcessorDetails processorDetails = new ProcessorDetails(processor); this.processorRef = new AtomicReference<>(processorDetails); @@ -168,7 +173,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable identifier = new AtomicReference<>(uuid); destinations = new HashMap<>(); connections = new HashMap<>(); - incomingConnectionsRef = new AtomicReference<>(new ArrayList<>()); + incomingConnections = new AtomicReference<>(new ArrayList<>()); lossTolerant = new AtomicBoolean(false); final Set<Relationship> emptySetOfRelationships = new HashSet<>(); undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); @@ -189,6 +194,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; executionNode = ExecutionNode.ALL; + this.hashCode = new HashCodeBuilder(7, 67).append(identifier).toHashCode(); + try { if (processorDetails.getProcClass().isAnnotationPresent(DefaultSchedule.class)) { DefaultSchedule dsc = processorDetails.getProcClass().getAnnotation(DefaultSchedule.class); @@ -393,7 +400,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable + "' as auto-terminated because Connection already exists with this relationship"); } } + undefinedRelationshipsToTerminate.set(new HashSet<>(terminate)); + resetValidationState(); } /** @@ -619,11 +628,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } - final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), - TimeUnit.MILLISECONDS); + + final long penalizationMillis = FormatUtils.getTimeDuration(requireNonNull(penalizationPeriod), TimeUnit.MILLISECONDS); if (penalizationMillis < 0) { throw new IllegalArgumentException("Penalization duration must be positive"); } + this.penalizationPeriod.set(penalizationPeriod); } @@ -641,10 +651,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); } + if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) { throw new IllegalArgumentException("Cannot set Concurrent Tasks to " + taskCount + " for component " + getIdentifier() + " because Scheduling Strategy is not Event Driven"); } + if (!isTriggeredSerially()) { concurrentTaskCount.set(taskCount); } @@ -656,8 +668,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } /** - * @return the number of tasks that may execute concurrently for this - * processor + * @return the number of tasks that may execute concurrently for this processor */ @Override public int getMaxConcurrentTasks() { @@ -686,7 +697,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public List<Connection> getIncomingConnections() { - return incomingConnectionsRef.get(); + return incomingConnections.get(); } @Override @@ -705,121 +716,130 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable "Cannot a connection to a ProcessorNode for which the ProcessorNode is neither the Source nor the Destination"); } - List<Connection> updatedIncoming = null; - if (connection.getDestination().equals(this)) { - // don't add the connection twice. This may occur if we have a - // self-loop because we will be told - // to add the connection once because we are the source and again - // because we are the destination. - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - updatedIncoming = new ArrayList<>(incomingConnections); - if (!updatedIncoming.contains(connection)) { - updatedIncoming.add(connection); + try { + List<Connection> updatedIncoming = null; + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a + // self-loop because we will be told + // to add the connection once because we are the source and again + // because we are the destination. + final List<Connection> incomingConnections = getIncomingConnections(); + updatedIncoming = new ArrayList<>(incomingConnections); + if (!updatedIncoming.contains(connection)) { + updatedIncoming.add(connection); + } } - } - if (connection.getSource().equals(this)) { - // don't add the connection twice. This may occur if we have a - // self-loop because we will be told - // to add the connection once because we are the source and again - // because we are the destination. - if (!destinations.containsKey(connection)) { - for (final Relationship relationship : connection.getRelationships()) { - final Relationship rel = getRelationship(relationship.getName()); - Set<Connection> set = connections.get(rel); - if (set == null) { - set = new HashSet<>(); - connections.put(rel, set); - } + if (connection.getSource().equals(this)) { + // don't add the connection twice. This may occur if we have a + // self-loop because we will be told + // to add the connection once because we are the source and again + // because we are the destination. + if (!destinations.containsKey(connection)) { + for (final Relationship relationship : connection.getRelationships()) { + final Relationship rel = getRelationship(relationship.getName()); + Set<Connection> set = connections.get(rel); + if (set == null) { + set = new HashSet<>(); + connections.put(rel, set); + } - set.add(connection); + set.add(connection); - destinations.put(connection, connection.getDestination()); - } + destinations.put(connection, connection.getDestination()); + } - final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); - if (autoTerminated != null) { - autoTerminated.removeAll(connection.getRelationships()); - this.undefinedRelationshipsToTerminate.set(autoTerminated); + final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); + if (autoTerminated != null) { + autoTerminated.removeAll(connection.getRelationships()); + this.undefinedRelationshipsToTerminate.set(autoTerminated); + } } } - } - if (updatedIncoming != null) { - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); + if (updatedIncoming != null) { + setIncomingConnections(Collections.unmodifiableList(updatedIncoming)); + } + } finally { + resetValidationState(); } } @Override public boolean hasIncomingConnection() { - return !incomingConnectionsRef.get().isEmpty(); + return !getIncomingConnections().isEmpty(); } @Override public void updateConnection(final Connection connection) throws IllegalStateException { - if (requireNonNull(connection).getSource().equals(this)) { - // update any relationships - // - // first check if any relations were removed. - final List<Relationship> existingRelationships = new ArrayList<>(); - for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { - if (entry.getValue().contains(connection)) { - existingRelationships.add(entry.getKey()); + try { + if (requireNonNull(connection).getSource().equals(this)) { + // update any relationships + // + // first check if any relations were removed. + final List<Relationship> existingRelationships = new ArrayList<>(); + for (final Map.Entry<Relationship, Set<Connection>> entry : connections.entrySet()) { + if (entry.getValue().contains(connection)) { + existingRelationships.add(entry.getKey()); + } } - } - for (final Relationship rel : connection.getRelationships()) { - if (!existingRelationships.contains(rel)) { - // relationship was removed. Check if this is legal. - final Set<Connection> connectionsForRelationship = getConnections(rel); - if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() + for (final Relationship rel : connection.getRelationships()) { + if (!existingRelationships.contains(rel)) { + // relationship was removed. Check if this is legal. + final Set<Connection> connectionsForRelationship = getConnections(rel); + if (connectionsForRelationship != null && connectionsForRelationship.size() == 1 && this.isRunning() && !isAutoTerminated(rel) && getRelationships().contains(rel)) { - // if we are running and we do not terminate undefined - // relationships and this is the only - // connection that defines the given relationship, and - // that relationship is required, - // then it is not legal to remove this relationship from - // this connection. - throw new IllegalStateException("Cannot remove relationship " + rel.getName() + // if we are running and we do not terminate undefined + // relationships and this is the only + // connection that defines the given relationship, and + // that relationship is required, + // then it is not legal to remove this relationship from + // this connection. + throw new IllegalStateException("Cannot remove relationship " + rel.getName() + " from Connection because doing so would invalidate Processor " + this + ", which is currently running"); + } } } - } - // remove the connection from any list that currently contains - for (final Set<Connection> list : connections.values()) { - list.remove(connection); - } + // remove the connection from any list that currently contains + for (final Set<Connection> list : connections.values()) { + list.remove(connection); + } - // add the connection in for all relationships listed. - for (final Relationship rel : connection.getRelationships()) { - Set<Connection> set = connections.get(rel); - if (set == null) { - set = new HashSet<>(); - connections.put(rel, set); + // add the connection in for all relationships listed. + for (final Relationship rel : connection.getRelationships()) { + Set<Connection> set = connections.get(rel); + if (set == null) { + set = new HashSet<>(); + connections.put(rel, set); + } + set.add(connection); } - set.add(connection); - } - // update to the new destination - destinations.put(connection, connection.getDestination()); + // update to the new destination + destinations.put(connection, connection.getDestination()); - final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); - if (autoTerminated != null) { - autoTerminated.removeAll(connection.getRelationships()); - this.undefinedRelationshipsToTerminate.set(autoTerminated); + final Set<Relationship> autoTerminated = this.undefinedRelationshipsToTerminate.get(); + if (autoTerminated != null) { + autoTerminated.removeAll(connection.getRelationships()); + this.undefinedRelationshipsToTerminate.set(autoTerminated); + } } - } - if (connection.getDestination().equals(this)) { - // update our incoming connections -- we can just remove & re-add - // the connection to update the list. - final List<Connection> incomingConnections = incomingConnectionsRef.get(); - final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); - updatedIncoming.remove(connection); - updatedIncoming.add(connection); - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); + if (connection.getDestination().equals(this)) { + // update our incoming connections -- we can just remove & re-add + // the connection to update the list. + final List<Connection> incomingConnections = getIncomingConnections(); + final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); + updatedIncoming.remove(connection); + updatedIncoming.add(connection); + setIncomingConnections(Collections.unmodifiableList(updatedIncoming)); + } + } finally { + // need to perform validation in case selected relationships were changed. + resetValidationState(); } } @@ -844,11 +864,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } if (connection.getDestination().equals(this)) { - final List<Connection> incomingConnections = incomingConnectionsRef.get(); + final List<Connection> incomingConnections = getIncomingConnections(); if (incomingConnections.contains(connection)) { final List<Connection> updatedIncoming = new ArrayList<>(incomingConnections); updatedIncoming.remove(connection); - incomingConnectionsRef.set(Collections.unmodifiableList(updatedIncoming)); + setIncomingConnections(Collections.unmodifiableList(updatedIncoming)); return; } } @@ -857,6 +877,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable throw new IllegalArgumentException( "Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source"); } + + resetValidationState(); + } + + private void setIncomingConnections(final List<Connection> incoming) { + this.incomingConnections.set(incoming); + resetValidationState(); } /** @@ -968,7 +995,18 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public boolean isRunning() { - return getScheduledState().equals(ScheduledState.RUNNING) || processScheduler.getActiveThreadCount(this) > 0; + return getScheduledState().equals(ScheduledState.RUNNING) || hasActiveThreads; + } + + @Override + public boolean isValidationNecessary() { + switch (getScheduledState()) { + case STOPPED: + case STOPPING: + return true; + } + + return false; } @Override @@ -988,100 +1026,62 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return nonLoopConnections; } - @Override - public boolean isValid() { - try { - final ValidationContext validationContext = getValidationContext(); - final Collection<ValidationResult> validationResults = super.validate(validationContext); - - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - return false; - } - } - - for (final Relationship undef : getUndefinedRelationships()) { - if (!isAutoTerminated(undef)) { - return false; - } - } - switch (getInputRequirement()) { - case INPUT_ALLOWED: - break; - case INPUT_FORBIDDEN: { - if (!getIncomingNonLoopConnections().isEmpty()) { - return false; - } - break; - } - case INPUT_REQUIRED: { - if (getIncomingNonLoopConnections().isEmpty()) { - return false; - } - break; - } - } - } catch (final Throwable t) { - LOG.warn("Failed during validation", t); - return false; - } - return true; + @Override + public Collection<ValidationResult> getValidationErrors() { + final ValidationState validationState = getValidationState(); + return validationState.getValidationErrors(); } @Override - public Collection<ValidationResult> getValidationErrors() { + protected Collection<ValidationResult> computeValidationErrors(final ValidationContext validationContext) { final List<ValidationResult> results = new ArrayList<>(); try { - // Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off - // we are willing to make in order to save on validation costs that would be unnecessary most of the time. - if (getScheduledState() == ScheduledState.STOPPED) { - final ValidationContext validationContext = getValidationContext(); - - final Collection<ValidationResult> validationResults = super.validate(validationContext); - - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - results.add(result); - } + final Collection<ValidationResult> validationResults = super.computeValidationErrors(validationContext); + + validationResults.stream() + .filter(result -> !result.isValid()) + .forEach(results::add); + + // Ensure that any relationships that don't have a connection defined are auto-terminated + for (final Relationship relationship : getUndefinedRelationships()) { + if (!isAutoTerminated(relationship)) { + final ValidationResult error = new ValidationResult.Builder() + .explanation("Relationship '" + relationship.getName() + + "' is not connected to any component and is not auto-terminated") + .subject("Relationship " + relationship.getName()).valid(false).build(); + results.add(error); } + } - for (final Relationship relationship : getUndefinedRelationships()) { - if (!isAutoTerminated(relationship)) { - final ValidationResult error = new ValidationResult.Builder() - .explanation("Relationship '" + relationship.getName() - + "' is not connected to any component and is not auto-terminated") - .subject("Relationship " + relationship.getName()).valid(false).build(); - results.add(error); + // Ensure that the requirements of the InputRequirement are met. + switch (getInputRequirement()) { + case INPUT_ALLOWED: + break; + case INPUT_FORBIDDEN: { + final int incomingConnCount = getIncomingNonLoopConnections().size(); + if (incomingConnCount != 0) { + results.add(new ValidationResult.Builder().explanation( + "Processor does not allow upstream connections but currently has " + incomingConnCount) + .subject("Upstream Connections").valid(false).build()); } + break; } - - switch (getInputRequirement()) { - case INPUT_ALLOWED: - break; - case INPUT_FORBIDDEN: { - final int incomingConnCount = getIncomingNonLoopConnections().size(); - if (incomingConnCount != 0) { - results.add(new ValidationResult.Builder().explanation( - "Processor does not allow upstream connections but currently has " + incomingConnCount) - .subject("Upstream Connections").valid(false).build()); - } - break; - } - case INPUT_REQUIRED: { - if (getIncomingNonLoopConnections().isEmpty()) { - results.add(new ValidationResult.Builder() - .explanation("Processor requires an upstream connection but currently has none") - .subject("Upstream Connections").valid(false).build()); - } - break; + case INPUT_REQUIRED: { + if (getIncomingNonLoopConnections().isEmpty()) { + results.add(new ValidationResult.Builder() + .explanation("Processor requires an upstream connection but currently has none") + .subject("Upstream Connections").valid(false).build()); } + break; } } } catch (final Throwable t) { + LOG.error("Failed to perform validation", t); results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()) .valid(false).build()); } + return results; } @@ -1108,7 +1108,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public int hashCode() { - return new HashCodeBuilder(7, 67).append(identifier).toHashCode(); + return hashCode; } @Override @@ -1135,7 +1135,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public synchronized void setProcessGroup(final ProcessGroup group) { this.processGroup.set(group); - invalidateValidationContext(); + resetValidationState(); } @Override @@ -1162,11 +1162,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public Collection<ValidationResult> validate(final ValidationContext validationContext) { - return getValidationErrors(); - } - - @Override public void verifyCanDelete() throws IllegalStateException { verifyCanDelete(false); } @@ -1184,7 +1179,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } - for (final Connection connection : incomingConnectionsRef.get()) { + for (final Connection connection : getIncomingConnections()) { if (connection.getSource().equals(this)) { connection.verifyCanDelete(); } else { @@ -1208,22 +1203,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable verifyNoActiveThreads(); - if (ignoredReferences != null) { - final Set<String> ids = new HashSet<>(); - for (final ControllerServiceNode node : ignoredReferences) { - ids.add(node.getIdentifier()); - } + switch (getValidationStatus()) { + case VALID: + return; + case VALIDATING: + throw new IllegalStateException("Processor with ID " + getIdentifier() + " cannot be started because its validation is still being performed"); + } - final Collection<ValidationResult> validationResults = getValidationErrors(ids); - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not valid: " + result); - } - } - } else { - if (!isValid()) { - throw new IllegalStateException(this.getIdentifier() + " is not in a valid state"); - } + final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences); + if (ignoredReferences != null && !validationErrors.isEmpty()) { + throw new IllegalStateException("Processor with ID " + getIdentifier() + " cannot be started because it is not currently valid"); } } @@ -1264,9 +1253,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } private void verifyNoActiveThreads() throws IllegalStateException { - final int threadCount = processScheduler.getActiveThreadCount(this); - if (threadCount > 0) { - throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active"); + if (hasActiveThreads) { + final int threadCount = getActiveThreadCount(); + if (threadCount > 0) { + throw new IllegalStateException(this.getIdentifier() + " has " + threadCount + " threads still active"); + } } } @@ -1314,9 +1305,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final ProcessContext processContext, final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) { - if (!this.isValid()) { - throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors()); + switch (getValidationStatus()) { + case INVALID: + throw new IllegalStateException("Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors()); + case VALIDATING: + throw new IllegalStateException("Processor " + this.getName() + " cannot be started because its validation is still being performed"); } + final Processor processor = processorRef.get().getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); @@ -1339,6 +1334,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } if (starting) { // will ensure that the Processor represented by this node can only be started once + hasActiveThreads = true; initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback); } else { final String procName = processorRef.get().toString(); @@ -1468,6 +1464,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable try { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); + hasActiveThreads = false; } finally { deactivateThread(); } @@ -1487,6 +1484,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable try { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); + hasActiveThreads = false; } finally { deactivateThread(); } @@ -1597,6 +1595,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } scheduleState.decrementActiveThreadCount(null); + hasActiveThreads = false; scheduledState.set(ScheduledState.STOPPED); future.complete(null);
http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java index 616f62c..70b3590 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java @@ -182,6 +182,7 @@ public class TemplateUtils { processorDTO.setExtensionMissing(null); processorDTO.setMultipleVersionsAvailable(null); processorDTO.setValidationErrors(null); + processorDTO.setValidationStatus(null); processorDTO.setInputRequirement(null); processorDTO.setDescription(null); processorDTO.setInputRequirement(null); @@ -236,6 +237,7 @@ public class TemplateUtils { serviceDTO.setCustomUiUrl(null); serviceDTO.setValidationErrors(null); + serviceDTO.setValidationStatus(null); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index b02baa7..9651add 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -18,8 +18,6 @@ package org.apache.nifi.controller.reporting; import java.net.URL; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -28,7 +26,8 @@ import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.AbstractConfiguredComponent; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.AbstractComponentNode; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.LoggableComponent; @@ -51,7 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.AnnotationUtils; -public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode { +public abstract class AbstractReportingTaskNode extends AbstractComponentNode implements ReportingTaskNode { private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class); @@ -68,20 +67,20 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, final ComponentVariableRegistry variableRegistry, - final ReloadComponent reloadComponent) { + final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger) { this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory, reportingTask.getComponent().getClass().getSimpleName(), reportingTask.getComponent().getClass().getCanonicalName(), - variableRegistry, reloadComponent, false); + variableRegistry, reloadComponent, validationTrigger, false); } public AbstractReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, - final ReloadComponent reloadComponent, final boolean isExtensionMissing) { + final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) { - super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing); + super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, validationTrigger, isExtensionMissing); this.reportingTaskRef = new AtomicReference<>(new ReportingTaskDetails(reportingTask)); this.processScheduler = processScheduler; this.serviceLookup = controllerServiceProvider; @@ -173,6 +172,11 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon } @Override + public boolean isValidationNecessary() { + return !processScheduler.isScheduled(this); + } + + @Override public int getActiveThreadCount() { return processScheduler.getActiveThreadCount(this); } @@ -283,16 +287,9 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon throw new IllegalStateException(this.getIdentifier() + " cannot be started because it has " + activeThreadCount + " active threads already"); } - final Set<String> ids = new HashSet<>(); - for (final ControllerServiceNode node : ignoredReferences) { - ids.add(node.getIdentifier()); - } - - final Collection<ValidationResult> validationResults = getValidationErrors(ids); - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not valid: " + result); - } + final Collection<ValidationResult> validationResults = getValidationErrors(ignoredReferences); + if (!validationResults.isEmpty()) { + throw new IllegalStateException(this.getIdentifier() + " cannot be started because it is not currently valid"); } } @@ -305,14 +302,4 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public String getProcessGroupIdentifier() { return null; } - - @Override - public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) { - Collection<ValidationResult> results = null; - if (getScheduledState() == ScheduledState.STOPPED) { - results = super.getValidationErrors(serviceIdentifiersNotToValidate); - } - return results != null ? results : Collections.emptySet(); - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java index dbe2f51..124f142 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java @@ -22,6 +22,7 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ProcessScheduler; @@ -38,17 +39,17 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, - final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) { - super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent); + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger) { + super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, reloadComponent, validationTrigger); this.flowController = controller; } public StandardReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final FlowController controller, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, final String componentType, final String canonicalClassName, final ComponentVariableRegistry variableRegistry, - final ReloadComponent reloadComponent, final boolean isExtensionMissing) { + final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) { super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName, - variableRegistry, reloadComponent, isExtensionMissing); + variableRegistry, reloadComponent, validationTrigger, isExtensionMissing); this.flowController = controller; } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 491abf0..8f7eb2f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -84,7 +84,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { // thread pool for starting/stopping components private final ScheduledExecutorService componentLifeCycleThreadPool; - private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processore Lifecycle", true); + private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle", true); private final StringEncryptor encryptor; @@ -180,8 +180,11 @@ public final class StandardProcessScheduler implements ProcessScheduler { throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running"); } - if (!taskNode.isValid()) { - throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors()); + switch (taskNode.getValidationStatus()) { + case INVALID: + throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors()); + case VALIDATING: + throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be scheduled because it is in the process of validating its configuration"); } final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); @@ -379,7 +382,22 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void onProcessorRemoved(final ProcessorNode procNode) { - this.lifecycleStates.remove(procNode); + lifecycleStates.remove(procNode); + } + + @Override + public void onPortRemoved(final Port port) { + lifecycleStates.remove(port); + } + + @Override + public void onFunnelRemoved(Funnel funnel) { + lifecycleStates.remove(funnel); + } + + @Override + public void onReportingTaskRemoved(final ReportingTaskNode reportingTask) { + lifecycleStates.remove(reportingTask); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java index b01f6e3..d0b7bcd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java @@ -24,20 +24,28 @@ import org.apache.nifi.controller.FlowController; * Serializes the flow configuration of a controller instance to an output stream. * */ -public interface FlowSerializer { +public interface FlowSerializer<T> { public static final String ENC_PREFIX = "enc{"; public static final String ENC_SUFFIX = "}"; /** - * Serializes the flow configuration of a controller instance. + * Transforms the flow configuration of a controller instance into something that can serialized * * @param controller a controller - * @param os an output stream to write the configuration to * @param stateLookup a lookup that can be used to determine the ScheduledState of a Processor * + * @return a form of the flow configuration that can be serialized by the {@link #serialize(Object, OutputStream)} method * @throws FlowSerializationException if serialization failed */ - void serialize(FlowController controller, OutputStream os, ScheduledStateLookup stateLookup) throws FlowSerializationException; + T transform(FlowController controller, ScheduledStateLookup stateLookup) throws FlowSerializationException; + /** + * Serializes the flow configuration to the given Output Stream + * + * @param flowConfiguration the flow configuration to serialize + * @param os the output stream to serialize to + * @throws FlowSerializationException if serialization failed + */ + void serialize(T flowConfiguration, OutputStream os) throws FlowSerializationException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index b0fe508..e2676a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -16,6 +16,26 @@ */ package org.apache.nifi.controller.serialization; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.Optional; +import java.util.WeakHashMap; +import java.util.concurrent.TimeUnit; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.TransformerFactoryConfigurationError; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.ConnectableType; @@ -51,41 +71,30 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.TransformerFactoryConfigurationError; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - /** * Serializes a Flow Controller as XML to an output stream. * * NOT THREAD-SAFE. */ -public class StandardFlowSerializer implements FlowSerializer { +public class StandardFlowSerializer implements FlowSerializer<Document> { private static final String MAX_ENCODING_VERSION = "1.3"; private final StringEncryptor encryptor; + // Cache of template to DOM Node for that template. This is done because when we serialize templates, we have to first + // take the template DTO, then serialize that into a byte[], then parse that byte[] as XML DOM objects. Then we can use that + // XML DOM Object in the serialized flow. This is expensive, so we cache these XML DOM objects here. We use a WeakHashMap + // because we don't get notified when the template has been removed from the system. + private static final Map<Template, Node> templateNodes = new WeakHashMap<>(); + public StandardFlowSerializer(final StringEncryptor encryptor) { this.encryptor = encryptor; } + @Override - public void serialize(final FlowController controller, final OutputStream os, final ScheduledStateLookup scheduledStateLookup) throws FlowSerializationException { + public Document transform(final FlowController controller, final ScheduledStateLookup scheduledStateLookup) throws FlowSerializationException { try { // create a new, empty document final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); @@ -120,7 +129,16 @@ public class StandardFlowSerializer implements FlowSerializer { addReportingTask(reportingTasksNode, taskNode, encryptor); } - final DOMSource domSource = new DOMSource(doc); + return doc; + } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) { + throw new FlowSerializationException(e); + } + } + + @Override + public void serialize(final Document flowConfiguration, final OutputStream os) throws FlowSerializationException { + try { + final DOMSource domSource = new DOMSource(flowConfiguration); final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os)); // configure the transformer and convert the DOM @@ -132,7 +150,7 @@ public class StandardFlowSerializer implements FlowSerializer { // transform the document to byte stream transformer.transform(domSource, streamResult); - } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) { + } catch (final DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) { throw new FlowSerializationException(e); } } @@ -607,18 +625,25 @@ public class StandardFlowSerializer implements FlowSerializer { element.appendChild(toAdd); } - public static void addTemplate(final Element element, final Template template) { - try { - final byte[] serialized = TemplateSerializer.serialize(template.getDetails()); - final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); - final DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); - final Document document; - try (final InputStream in = new ByteArrayInputStream(serialized)) { - document = docBuilder.parse(in); + public static synchronized void addTemplate(final Element element, final Template template) { + try { + Node templateNode = templateNodes.get(template); + if (templateNode == null) { + final byte[] serialized = TemplateSerializer.serialize(template.getDetails()); + + final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); + final DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); + final Document document; + try (final InputStream in = new ByteArrayInputStream(serialized)) { + document = docBuilder.parse(in); + } + + templateNode = document.getDocumentElement(); + templateNodes.put(template, templateNode); } - final Node templateNode = element.getOwnerDocument().importNode(document.getDocumentElement(), true); + templateNode = element.getOwnerDocument().importNode(templateNode, true); element.appendChild(templateNode); } catch (final Exception e) { throw new FlowSerializationException(e); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index 633f0ed..102ae26 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -146,17 +146,26 @@ public class ControllerServiceLoader { final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState()); if (state == ControllerServiceState.ENABLED) { nodesToEnable.add(node); + logger.debug("Will enable Controller Service {}", node); + } else { + logger.debug("Will not enable Controller Service {} because its state is set to {}", node, state); } } enableControllerServices(nodesToEnable, controller, autoResumeState); + } else { + logger.debug("Will not enable the following Controller Services because 'auto-resume state' flag is false: {}", nodeMap.keySet()); } } public static void enableControllerServices(final Collection<ControllerServiceNode> nodesToEnable, final FlowController controller, final boolean autoResumeState) { // Start services if (autoResumeState) { + logger.debug("Enabling Controller Services {}", nodesToEnable); + nodesToEnable.stream().forEach(ControllerServiceNode::performValidation); // validate services before attempting to enable them controller.enableControllerServices(nodesToEnable); + } else { + logger.debug("Will not enable the following Controller Services because 'auto-resume state' flag is false: {}", nodesToEnable); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java index 148b847..319c18c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java @@ -21,11 +21,18 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.nifi.controller.ComponentNode; + public class ServiceStateTransition { private ControllerServiceState state = ControllerServiceState.DISABLED; private final List<CompletableFuture<?>> enabledFutures = new ArrayList<>(); private final List<CompletableFuture<?>> disabledFutures = new ArrayList<>(); + private final ControllerServiceNode serviceNode; + + public ServiceStateTransition(final ControllerServiceNode serviceNode) { + this.serviceNode = serviceNode; + } public synchronized boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture<?> enabledFuture) { if (expectedState != state) { @@ -43,10 +50,23 @@ public class ServiceStateTransition { } state = ControllerServiceState.ENABLED; + + validateReferences(serviceNode); + enabledFutures.stream().forEach(future -> future.complete(null)); return true; } + private void validateReferences(final ControllerServiceNode service) { + for (final ComponentNode component : service.getReferences().getReferencingComponents()) { + component.performValidation(); + + if (component instanceof ControllerServiceNode) { + validateReferences((ControllerServiceNode) component); + } + } + } + public synchronized boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture<?> disabledFuture) { if (expectedState != state) { return false; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java index 6e8ff6c..3c6a003 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java @@ -27,21 +27,21 @@ import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.FormatUtils; public class StandardConfigurationContext implements ConfigurationContext { - private final ConfiguredComponent component; + private final ComponentNode component; private final ControllerServiceLookup serviceLookup; private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; private final VariableRegistry variableRegistry; private final String schedulingPeriod; private final Long schedulingNanos; - public StandardConfigurationContext(final ConfiguredComponent component, final ControllerServiceLookup serviceLookup, final String schedulingPeriod, + public StandardConfigurationContext(final ComponentNode component, final ControllerServiceLookup serviceLookup, final String schedulingPeriod, final VariableRegistry variableRegistry) { this.component = component; this.serviceLookup = serviceLookup; http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java index fed09af..ea83edc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java @@ -60,6 +60,7 @@ public class StandardControllerServiceInvocationHandler implements ControllerSer this.serviceNodeHolder.set(serviceNode); } + @Override public void setServiceNode(final ControllerServiceNode serviceNode) { this.serviceNodeHolder.set(serviceNode); } @@ -75,14 +76,8 @@ public class StandardControllerServiceInvocationHandler implements ControllerSer final ControllerServiceState state = node.getState(); final boolean disabled = state != ControllerServiceState.ENABLED; // only allow method call if service state is ENABLED. if (disabled && !validDisabledMethods.contains(method)) { - // Use nar class loader here because we are implicitly calling toString() on the original implementation. - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { - throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService.getIdentifier() - + " because the Controller Service is disabled"); - } catch (final Throwable e) { - throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " - + originalService.getIdentifier() + " because the Controller Service is disabled"); - } + throw new ControllerServiceDisabledException(node.getIdentifier(), "Cannot invoke method " + method + " on Controller Service with identifier " + + serviceNodeHolder.get().getIdentifier() + " because the Controller Service's State is currently " + state); } try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 9d26eef..74050e2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -48,9 +48,11 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.AbstractConfiguredComponent; +import org.apache.nifi.components.validation.ValidationState; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.AbstractComponentNode; +import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ReloadComponent; @@ -68,20 +70,20 @@ import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode { +public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode { private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class); private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null); private final ControllerServiceProvider serviceProvider; - private final ServiceStateTransition stateTransition = new ServiceStateTransition(); + private final ServiceStateTransition stateTransition; private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - private final Set<ConfiguredComponent> referencingComponents = new HashSet<>(); + private final Set<ComponentNode> referencingComponents = new HashSet<>(); private String comment; private ProcessGroup processGroup; @@ -89,22 +91,24 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService, final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory, - final ControllerServiceProvider serviceProvider, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent) { + final ControllerServiceProvider serviceProvider, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, + final ValidationTrigger validationTrigger) { - this(implementation, proxiedControllerService, invocationHandler, id, validationContextFactory, serviceProvider, - implementation.getComponent().getClass().getSimpleName(), implementation.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, false); + this(implementation, proxiedControllerService, invocationHandler, id, validationContextFactory, serviceProvider, implementation.getComponent().getClass().getSimpleName(), + implementation.getComponent().getClass().getCanonicalName(), variableRegistry, reloadComponent, validationTrigger, false); } public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService, final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass, - final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final boolean isExtensionMissing) { + final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, + final boolean isExtensionMissing) { - super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing); + super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, validationTrigger, isExtensionMissing); this.serviceProvider = serviceProvider; this.active = new AtomicBoolean(); setControllerServiceAndProxy(implementation, proxiedControllerService, invocationHandler); - + stateTransition = new ServiceStateTransition(this); } @Override @@ -218,7 +222,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i writeLock.lock(); try { this.processGroup = group; - invalidateValidationContext(); + resetValidationState(); } finally { writeLock.unlock(); } @@ -235,7 +239,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } @Override - public void addReference(final ConfiguredComponent referencingComponent) { + public void addReference(final ComponentNode referencingComponent) { writeLock.lock(); try { referencingComponents.add(referencingComponent); @@ -252,7 +256,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i if (descriptor.getControllerServiceDefinition() != null && entry.getValue() != null) { ControllerServiceNode requiredNode = serviceProvider.getControllerServiceNode(entry.getValue()); requiredServices.add(requiredNode); - requiredServices.addAll(requiredNode.getRequiredControllerServices()); } } return new ArrayList<>(requiredServices); @@ -260,7 +263,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override - public void removeReference(final ConfiguredComponent referencingComponent) { + public void removeReference(final ComponentNode referencingComponent) { writeLock.lock(); try { referencingComponents.remove(referencingComponent); @@ -297,7 +300,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i final ControllerServiceReference references = getReferences(); final Set<String> activeReferencesIdentifiers = new HashSet<>(); - for (final ConfiguredComponent activeReference : references.getActiveReferences()) { + for (final ComponentNode activeReference : references.getActiveReferences()) { if (!ignoreReferences.contains(activeReference)) { activeReferencesIdentifiers.add(activeReference.getIdentifier()); } @@ -315,8 +318,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled"); } - if (!isValid()) { - throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + getValidationErrors()); + final ValidationState validationState = getValidationState(); + switch (validationState.getStatus()) { + case INVALID: + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + validationState.getValidationErrors()); + case VALIDATING: + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because its validation has not yet completed"); } } @@ -326,16 +333,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled"); } - final Set<String> ids = new HashSet<>(); - for (final ControllerServiceNode node : ignoredReferences) { - ids.add(node.getIdentifier()); - } - - final Collection<ValidationResult> validationResults = getValidationErrors(ids); - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + result); - } + final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences); + if (ignoredReferences != null && !validationErrors.isEmpty()) { + throw new IllegalStateException("Controller Service with ID " + getIdentifier() + " cannot be enabled because it is not currently valid"); } } @@ -381,6 +381,19 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i return this.active.get(); } + @Override + public boolean isValidationNecessary() { + switch (getState()) { + case DISABLED: + case DISABLING: + return true; + case ENABLED: + case ENABLING: + default: + return false; + } + } + /** * Will atomically enable this service by invoking its @OnEnabled operation. * It uses CAS operation on {@link #stateRef} to transition this service @@ -460,6 +473,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i return future; } + /** * Will atomically disable this service by invoking its @OnDisabled operation. * It uses CAS operation on {@link #stateRef} to transition this service @@ -496,6 +510,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i invokeDisable(configContext); } finally { stateTransition.disable(); + + // Now all components that reference this service will be invalid. Trigger validation to occur so that + // this is reflected in any response that may go back to a user/client. + for (final ComponentNode component : getReferences().getReferencingComponents()) { + component.performValidation(); + } } } }); @@ -506,9 +526,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i return future; } - /** - * - */ + private void invokeDisable(ConfigurationContext configContext) { try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext); @@ -528,15 +546,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } @Override - public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) { - Collection<ValidationResult> results = null; - if (getState() == ControllerServiceState.DISABLED) { - results = super.getValidationErrors(serviceIdentifiersNotToValidate); - } - return results != null ? results : Collections.emptySet(); - } - - @Override public Optional<String> getVersionedComponentId() { return Optional.ofNullable(versionedComponentId.get()); }