NIFI-1464 addressed latest PR comments NIFI-1464 polishing
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/48af0bfb Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/48af0bfb Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/48af0bfb Branch: refs/heads/master Commit: 48af0bfbc5cafe06e003b2bd086b970cf49b5025 Parents: f53f45d Author: Oleg Zhurakousky <[email protected]> Authored: Fri Feb 26 12:42:48 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Mar 11 12:54:50 2016 -0500 ---------------------------------------------------------------------- .../apache/nifi/controller/ProcessorNode.java | 55 ++++++++++++++++++ .../nifi/controller/StandardFlowSerializer.java | 4 +- .../nifi/controller/StandardProcessorNode.java | 9 +-- .../scheduling/StandardProcessScheduler.java | 8 +-- .../scheduling/TestProcessorLifecycle.java | 61 +++++++++++++++++++- .../org/apache/nifi/web/api/dto/DtoFactory.java | 47 +++++++-------- 6 files changed, 143 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index ff7977d..a428349 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.connectable.Connectable; @@ -31,12 +32,19 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { + private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class); + + protected final AtomicReference<ScheduledState> scheduledState; + public ProcessorNode(final Processor processor, final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) { super(processor, id, validationContextFactory, serviceProvider); + this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED); } public abstract boolean isIsolated(); @@ -101,6 +109,31 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences); /** + * + */ + @Override + public ScheduledState getScheduledState() { + return this.scheduledState.get(); + } + + /** + * Returns the logical state of this processor. Logical state ignores + * transition states such as STOPPING and STARTING rounding it up to the + * next logical state of STOPPED and RUNNING respectively. + * + * @return the logical state of this processor [DISABLED, STOPPED, RUNNING] + */ + public ScheduledState getLogicalScheduledState() { + ScheduledState sc = this.scheduledState.get(); + if (sc == ScheduledState.STARTING) { + return ScheduledState.RUNNING; + } else if (sc == ScheduledState.STOPPING) { + return ScheduledState.STOPPED; + } + return sc; + } + + /** * Will start the {@link Processor} represented by this * {@link ProcessorNode}. Starting processor typically means invoking its * operation that is annotated with @OnScheduled and then executing a @@ -145,4 +178,26 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen */ public abstract <T extends ProcessContext & ControllerServiceLookup> void stop(ScheduledExecutorService scheduler, T processContext, Callable<Boolean> activeThreadMonitorCallback); + + /** + * Will set the state of the processor to STOPPED which essentially implies + * that this processor can be started. This is idempotent operation and will + * result in the WARN message if processor can not be enabled. + */ + public void enable() { + if (!this.scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED)) { + logger.warn("Processor cannot be enabled because it is not disabled"); + } + } + + /** + * Will set the state of the processor to DISABLED which essentially implies + * that this processor can NOT be started. This is idempotent operation and + * will result in the WARN message if processor can not be enabled. + */ + public void disable() { + if (!this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED)) { + logger.warn("Processor cannot be disabled because its state is set to " + this.scheduledState); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java index 1ee85a2..e9f958c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java @@ -255,7 +255,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "comments", port.getComments()); addTextElement(element, "scheduledState", port.getScheduledState().name()); addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); - addTextElement(element, "useCompression", String.valueOf(((RemoteGroupPort) port).isUseCompression())); + addTextElement(element, "useCompression", String.valueOf(port.isUseCompression())); parentElement.appendChild(element); } @@ -311,7 +311,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "yieldPeriod", processor.getYieldPeriod()); addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString()); addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); - addTextElement(element, "scheduledState", processor.getScheduledState().name()); + addTextElement(element, "scheduledState", processor.getLogicalScheduledState().name()); addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index dac56b5..2a0819c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -106,7 +106,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final AtomicReference<List<Connection>> incomingConnectionsRef; private final AtomicBoolean isolated; private final AtomicBoolean lossTolerant; - private final AtomicReference<ScheduledState> scheduledState; private final AtomicReference<String> comments; private final AtomicReference<Position> position; private final AtomicReference<String> annotationData; @@ -145,7 +144,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable destinations = new HashMap<>(); connections = new HashMap<>(); incomingConnectionsRef = new AtomicReference<List<Connection>>(new ArrayList<Connection>()); - scheduledState = new AtomicReference<>(ScheduledState.STOPPED); lossTolerant = new AtomicBoolean(false); final Set<Relationship> emptySetOfRelationships = new HashSet<>(); undefinedRelationshipsToTerminate = new AtomicReference<>(emptySetOfRelationships); @@ -214,11 +212,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override - public ScheduledState getScheduledState() { - return this.scheduledState.get(); - } - - @Override public Position getPosition() { return position.get(); } @@ -1276,7 +1269,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable }; taskScheduler.execute(startProcRunnable); } else { - LOG.warn("Can not start Processor since it's already in the process of being started"); + LOG.warn("Can not start Processor since it's already in the process of being started or it is DISABLED"); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 84d667f..bbaa23b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -473,16 +473,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public synchronized void enableProcessor(final ProcessorNode procNode) { - if (procNode.getScheduledState() != ScheduledState.DISABLED) { - throw new IllegalStateException("Processor cannot be enabled because it is not disabled"); - } + procNode.enable(); } @Override public synchronized void disableProcessor(final ProcessorNode procNode) { - if (procNode.getScheduledState() != ScheduledState.STOPPED) { - throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState()); - } + procNode.disable(); } public synchronized void enableReportingTask(final ReportingTaskNode taskNode) { http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 2cce14d..d78009c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -89,6 +89,53 @@ public class TestProcessorLifecycle { FileUtils.deleteDirectory(new File("./target/content_repository")); } + @Test + public void validateEnableOperation() throws Exception { + FlowController fc = this.buildFlowControllerForTest(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); + this.setControllerRootGroup(fc, testGroup); + final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), + UUID.randomUUID().toString()); + + assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState()); + // validates idempotency + for (int i = 0; i < 2; i++) { + testProcNode.enable(); + } + assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState()); + testProcNode.disable(); + assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState()); + fc.shutdown(true); + } + + + @Test + public void validateDisableOperation() throws Exception { + FlowController fc = this.buildFlowControllerForTest(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); + this.setControllerRootGroup(fc, testGroup); + final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), + UUID.randomUUID().toString()); + testProcNode.setProperty("P", "hello"); + assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.STOPPED, testProcNode.getLogicalScheduledState()); + // validates idempotency + for (int i = 0; i < 2; i++) { + testProcNode.disable(); + } + assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); + assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState()); + + ProcessScheduler ps = fc.getProcessScheduler(); + ps.startProcessor(testProcNode); + assertEquals(ScheduledState.DISABLED, testProcNode.getLogicalScheduledState()); + + fc.shutdown(true); + } + /** * Will validate the idempotent nature of processor start operation which * can be called multiple times without any side-effects. @@ -276,10 +323,12 @@ public class TestProcessorLifecycle { ps.startProcessor(testProcNode); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING); ps.stopProcessor(testProcNode); Thread.sleep(100); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); @@ -360,7 +409,7 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { - NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5000"); + NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -390,7 +439,7 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { - NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5000"); + NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, "5 sec"); FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -406,9 +455,14 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); + ps.disableProcessor(testProcNode); // no effect + Thread.sleep(100); + assertTrue(testProcNode.getScheduledState() == ScheduledState.STARTING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.RUNNING); ps.stopProcessor(testProcNode); Thread.sleep(100); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPING); + assertTrue(testProcNode.getLogicalScheduledState() == ScheduledState.STOPPED); Thread.sleep(4000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); fc.shutdown(true); @@ -435,6 +489,9 @@ public class TestProcessorLifecycle { ps.startProcessor(testProcNode); Thread.sleep(1000); assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + ps.disableProcessor(testProcNode); + Thread.sleep(100); + assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); ps.stopProcessor(testProcNode); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 652fd04..64ffdaf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,29 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.component.details.ComponentDetails; @@ -128,28 +151,6 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusDTO; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -1643,7 +1644,7 @@ public final class DtoFactory { dto.setType(node.getProcessor().getClass().getCanonicalName()); dto.setName(node.getName()); - dto.setState(node.getScheduledState().toString()); + dto.setState(node.getLogicalScheduledState().toString()); // build the relationship dtos final List<RelationshipDTO> relationships = new ArrayList<>();
