NIFI-277: Added verifyCanXX methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d734220d Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d734220d Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d734220d Branch: refs/heads/annotations Commit: d734220d1e59ff02878a2b9f3913348e8d38ae17 Parents: 7bcfc93 Author: Mark Payne <marka...@hotmail.com> Authored: Fri Jan 16 15:51:34 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Fri Jan 16 15:51:34 2015 -0500 ---------------------------------------------------------------------- .../nifi/controller/ReportingTaskNode.java | 16 +++++ .../service/ControllerServiceNode.java | 6 +- .../reporting/AbstractReportingTaskNode.java | 51 ++++++++++++++++ .../service/StandardControllerServiceNode.java | 61 +++++++++++++++++--- 4 files changed, 126 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java index 6b8ede0..f456ddd 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -53,4 +53,20 @@ public interface ReportingTaskNode extends ConfiguredComponent { ConfigurationContext getConfigurationContext(); boolean isRunning(); + + /** + * Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A + * value of stopped does NOT indicate that the <code>ReportingTask</code> has + * no active threads, only that it is not currently scheduled to be given + * any more threads. To determine whether or not the + * <code>ReportingTask</code> has any active threads, see + * {@link ProcessScheduler#getActiveThreadCount(ReportingTask)}. + * + * @return + */ + ScheduledState getScheduledState(); + + void setScheduledState(ScheduledState state); + + void verifyCanDelete(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 6f9c237..dd4b49a 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -22,7 +22,9 @@ import org.apache.nifi.controller.ControllerService; public interface ControllerServiceNode extends ConfiguredComponent { - ControllerService getControllerService(); + ControllerService getProxiedControllerService(); + + ControllerService getControllerServiceImplementation(); Availability getAvailability(); @@ -37,4 +39,6 @@ public interface ControllerServiceNode extends ConfiguredComponent { void addReference(ConfiguredComponent referringComponent); void removeReference(ConfiguredComponent referringComponent); + + void verifyCanDelete(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 6c27470..8b10a84 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -19,18 +19,25 @@ package org.apache.nifi.controller.reporting; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.Availability; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.controller.exception.ProcessorLifeCycleException; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; +import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.ReflectionUtils; public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode { @@ -42,6 +49,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins"); private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY); + private volatile ScheduledState scheduledState = ScheduledState.STOPPED; + public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id, final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory) { @@ -108,4 +117,46 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon } } + @Override + public ScheduledState getScheduledState() { + return scheduledState; + } + + @Override + public void setScheduledState(final ScheduledState state) { + this.scheduledState = state; + } + + @Override + public void setProperty(final String name, final String value) { + super.setProperty(name, value); + + onConfigured(); + } + + @Override + public boolean removeProperty(String name) { + final boolean removed = super.removeProperty(name); + if ( removed ) { + onConfigured(); + } + + return removed; + } + + private void onConfigured() { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup); + ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext); + } catch (final Exception e) { + throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e); + } + } + + @Override + public void verifyCanDelete() { + if (isRunning()) { + throw new IllegalStateException(this + " is running"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 455eac1..61a3aa8 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -26,13 +26,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.Availability; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.controller.exception.ProcessorLifeCycleException; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.util.ReflectionUtils; public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode { - private final ControllerService controllerService; + private final ControllerService proxedControllerService; + private final ControllerService implementation; + private final ControllerServiceProvider serviceProvider; private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY); private final AtomicBoolean disabled = new AtomicBoolean(true); @@ -43,10 +50,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final Set<ConfiguredComponent> referencingComponents = new HashSet<>(); - public StandardControllerServiceNode(final ControllerService controllerService, final String id, + public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { - super(controllerService, id, validationContextFactory, serviceProvider); - this.controllerService = controllerService; + super(proxiedControllerService, id, validationContextFactory, serviceProvider); + this.proxedControllerService = proxiedControllerService; + this.implementation = implementation; + this.serviceProvider = serviceProvider; } @Override @@ -57,7 +66,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void setDisabled(final boolean disabled) { if (!disabled && !isValid()) { - throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid"); + throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid"); } if (disabled) { @@ -82,8 +91,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } @Override - public ControllerService getControllerService() { - return controllerService; + public ControllerService getProxiedControllerService() { + return proxedControllerService; + } + + @Override + public ControllerService getControllerServiceImplementation() { + return implementation; } @Override @@ -122,4 +136,37 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first."); } } + + @Override + public void setProperty(final String name, final String value) { + super.setProperty(name, value); + + onConfigured(); + } + + @Override + public boolean removeProperty(String name) { + final boolean removed = super.removeProperty(name); + if ( removed ) { + onConfigured(); + } + + return removed; + } + + private void onConfigured() { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider); + ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext); + } catch (final Exception e) { + throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e); + } + } + + @Override + public void verifyCanDelete() { + if ( !isDisabled() ) { + throw new IllegalStateException(this + " cannot be deleted because it has not been disabled"); + } + } }