NIFI-3725 - validate processors only when they are in STOPPED state - report validation errors via REST API on processors/services/tasks/ports only when they are in the STOPPED state
Signed-off-by: Joe Skora <jsk...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8a18d266 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8a18d266 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8a18d266 Branch: refs/heads/support/nifi-0.7.x Commit: 8a18d266666c28bcb0e2df6b30359976638637d9 Parents: b69b3b6 Author: Mike Moser <mose...@apache.org> Authored: Wed Apr 26 18:00:32 2017 +0000 Committer: Joe Skora <jsk...@apache.org> Committed: Thu Apr 27 15:39:25 2017 -0400 ---------------------------------------------------------------------- .../apache/nifi/controller/FlowController.java | 12 ++-- .../nifi/controller/StandardProcessorNode.java | 72 +++++++++++--------- .../reporting/AbstractReportingTaskNode.java | 10 +++ .../service/StandardControllerServiceNode.java | 9 +++ .../nifi/remote/StandardRemoteGroupPort.java | 49 +++++++------ .../nifi/remote/StandardRootGroupPort.java | 17 +++-- 6 files changed, 103 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 4559706..611e471 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2551,16 +2551,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R status.setBytesSent(entry.getBytesSent()); } - // determine the run status and get any validation errors... must check - // is valid when not disabled since a processors validity could change due - // to environmental conditions (property configured with a file path and - // the file being externally removed) + // Determine the run status and get any validation errors... only validating while STOPPED + // is a trade-off we are willing to make, even though processor validity could change due to + // environmental conditions (property configured with a file path and the file being externally + // removed). This saves on validation costs that would be unnecessary most of the time. if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { status.setRunStatus(RunStatus.Disabled); - } else if (!procNode.isValid()) { - status.setRunStatus(RunStatus.Invalid); } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) { status.setRunStatus(RunStatus.Running); + } else if (!procNode.isValid()) { + status.setRunStatus(RunStatus.Invalid); } else { status.setRunStatus(RunStatus.Stopped); } http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 5b45d2a..bb93fa2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -946,50 +946,54 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final List<ValidationResult> results = new ArrayList<>(); try { - final ValidationContext validationContext = this.getValidationContextFactory() - .newValidationContext(getProperties(), getAnnotationData()); - - final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - validationResults = getProcessor().validate(validationContext); - } + // Processors may go invalid while RUNNING, but only validating while STOPPED is a trade-off + // we are willing to make in order to save on validation costs that would be unnecessary most of the time. + if (getScheduledState() == ScheduledState.STOPPED) { + final ValidationContext validationContext = this.getValidationContextFactory() + .newValidationContext(getProperties(), getAnnotationData()); + + final Collection<ValidationResult> validationResults; + try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { + validationResults = getProcessor().validate(validationContext); + } - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - results.add(result); + for (final ValidationResult result : validationResults) { + if (!result.isValid()) { + results.add(result); + } } - } - for (final Relationship relationship : getUndefinedRelationships()) { - if (!isAutoTerminated(relationship)) { - final ValidationResult error = new ValidationResult.Builder() + for (final Relationship relationship : getUndefinedRelationships()) { + if (!isAutoTerminated(relationship)) { + final ValidationResult error = new ValidationResult.Builder() .explanation("Relationship '" + relationship.getName() + "' is not connected to any component and is not auto-terminated") .subject("Relationship " + relationship.getName()).valid(false).build(); - results.add(error); + results.add(error); + } } - } - switch (getInputRequirement()) { - case INPUT_ALLOWED: - break; - case INPUT_FORBIDDEN: { - final int incomingConnCount = getIncomingNonLoopConnections().size(); - if (incomingConnCount != 0) { - results.add(new ValidationResult.Builder().explanation( - "Processor does not allow upstream connections but currently has " + incomingConnCount) - .subject("Upstream Connections").valid(false).build()); + switch (getInputRequirement()) { + case INPUT_ALLOWED: + break; + case INPUT_FORBIDDEN: { + final int incomingConnCount = getIncomingNonLoopConnections().size(); + if (incomingConnCount != 0) { + results.add(new ValidationResult.Builder().explanation( + "Processor does not allow upstream connections but currently has " + incomingConnCount) + .subject("Upstream Connections").valid(false).build()); + } + break; + } + case INPUT_REQUIRED: { + if (getIncomingNonLoopConnections().isEmpty()) { + results.add(new ValidationResult.Builder() + .explanation("Processor requires an upstream connection but currently has none") + .subject("Upstream Connections").valid(false).build()); + } + break; } - break; - } - case INPUT_REQUIRED: { - if (getIncomingNonLoopConnections().isEmpty()) { - results.add(new ValidationResult.Builder() - .explanation("Processor requires an upstream connection but currently has none") - .subject("Upstream Connections").valid(false).build()); } - break; - } } } catch (final Throwable t) { results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()) http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 31c2242..21a6553 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.reporting; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -259,4 +260,13 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon public String toString() { return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]"; } + + @Override + public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) { + Collection<ValidationResult> results = null; + if (getScheduledState() == ScheduledState.STOPPED) { + results = super.getValidationErrors(serviceIdentifiersNotToValidate); + } + return results != null ? results : Collections.<ValidationResult>emptySet(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 4aa9ab6..e8e3810 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -395,4 +395,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString()); } } + + @Override + public Collection<ValidationResult> getValidationErrors(Set<String> serviceIdentifiersNotToValidate) { + Collection<ValidationResult> results = null; + if (stateRef.get() == ControllerServiceState.DISABLED) { + results = super.getValidationErrors(serviceIdentifiersNotToValidate); + } + return results != null ? results : Collections.<ValidationResult>emptySet(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 9f6f783..dd44bc8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; @@ -350,7 +351,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ + logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); } @@ -364,31 +365,41 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { @Override public boolean isValid() { - return getValidationErrors().isEmpty(); + if (!targetExists.get()) { + return false; + } + + if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) { + // if it's an output port, ensure that there is an outbound connection + return false; + } + + return true; } @Override public Collection<ValidationResult> getValidationErrors() { final Collection<ValidationResult> validationErrors = new ArrayList<>(); - ValidationResult error = null; - if (!targetExists.get()) { - error = new ValidationResult.Builder() - .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); - } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) { - error = new ValidationResult.Builder() - .explanation(String.format("Port '%s' has no outbound connections", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); - } + if (getScheduledState() == ScheduledState.STOPPED) { + ValidationResult error = null; + if (!targetExists.get()) { + error = new ValidationResult.Builder() + .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); + } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) { + error = new ValidationResult.Builder() + .explanation(String.format("Port '%s' has no outbound connections", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); + } - if (error != null) { - validationErrors.add(error); + if (error != null) { + validationErrors.add(error); + } } - return validationErrors; } http://git-wip-us.apache.org/repos/asf/nifi/blob/8a18d266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 66fd303..71932f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -44,6 +44,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.EventReporter; import org.apache.nifi.groups.ProcessGroup; @@ -262,13 +263,15 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort @Override public Collection<ValidationResult> getValidationErrors() { final Collection<ValidationResult> validationErrors = new ArrayList<>(); - if (!isValid()) { - final ValidationResult error = new ValidationResult.Builder() - .explanation(String.format("Output connection for port '%s' is not defined.", getName())) - .subject(String.format("Port '%s'", getName())) - .valid(false) - .build(); - validationErrors.add(error); + if (getScheduledState() == ScheduledState.STOPPED) { + if (!isValid()) { + final ValidationResult error = new ValidationResult.Builder() + .explanation(String.format("Output connection for port '%s' is not defined.", getName())) + .subject(String.format("Port '%s'", getName())) + .valid(false) + .build(); + validationErrors.add(error); + } } return validationErrors; }