NIFI-1464 addressed latest PR comments

NIFI-1464 polishing


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/48af0bfb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/48af0bfb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/48af0bfb

Branch: refs/heads/master
Commit: 48af0bfbc5cafe06e003b2bd086b970cf49b5025
Parents: f53f45d
Author: Oleg Zhurakousky <[email protected]>
Authored: Fri Feb 26 12:42:48 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Mar 11 12:54:50 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/controller/ProcessorNode.java   | 55 ++++++++++++++++++
 .../nifi/controller/StandardFlowSerializer.java |  4 +-
 .../nifi/controller/StandardProcessorNode.java  |  9 +--
 .../scheduling/StandardProcessScheduler.java    |  8 +--
 .../scheduling/TestProcessorLifecycle.java      | 61 +++++++++++++++++++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java | 47 +++++++--------
 6 files changed, 143 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index ff7977d..a428349 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.connectable.Connectable;
@@ -31,12 +32,19 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class ProcessorNode extends AbstractConfiguredComponent 
implements Connectable {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(ProcessorNode.class);
+
+    protected final AtomicReference<ScheduledState> scheduledState;
+
     public ProcessorNode(final Processor processor, final String id,
         final ValidationContextFactory validationContextFactory, final 
ControllerServiceProvider serviceProvider) {
         super(processor, id, validationContextFactory, serviceProvider);
+        this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
     }
 
     public abstract boolean isIsolated();
@@ -101,6 +109,31 @@ public abstract class ProcessorNode extends 
AbstractConfiguredComponent implemen
     public abstract void verifyCanStart(Set<ControllerServiceNode> 
ignoredReferences);
 
     /**
+     *
+     */
+    @Override
+    public ScheduledState getScheduledState() {
+        return this.scheduledState.get();
+    }
+
+    /**
+     * Returns the logical state of this processor. Logical state ignores
+     * transition states such as STOPPING and STARTING rounding it up to the
+     * next logical state of STOPPED and RUNNING respectively.
+     *
+     * @return the logical state of this processor [DISABLED, STOPPED, RUNNING]
+     */
+    public ScheduledState getLogicalScheduledState() {
+        ScheduledState sc = this.scheduledState.get();
+        if (sc == ScheduledState.STARTING) {
+            return ScheduledState.RUNNING;
+        } else if (sc == ScheduledState.STOPPING) {
+            return ScheduledState.STOPPED;
+        }
+        return sc;
+    }
+
+    /**
      * Will start the {@link Processor} represented by this
      * {@link ProcessorNode}. Starting processor typically means invoking its
      * operation that is annotated with @OnScheduled and then executing a
@@ -145,4 +178,26 @@ public abstract class ProcessorNode extends 
AbstractConfiguredComponent implemen
      */
     public abstract <T extends ProcessContext & ControllerServiceLookup> void 
stop(ScheduledExecutorService scheduler,
             T processContext, Callable<Boolean> activeThreadMonitorCallback);
+
+    /**
+     * Will set the state of the processor to STOPPED which essentially implies
+     * that this processor can be started. This is idempotent operation and 
will
+     * result in the WARN message if processor can not be enabled.
+     */
+    public void enable() {
+        if (!this.scheduledState.compareAndSet(ScheduledState.DISABLED, 
ScheduledState.STOPPED)) {
+            logger.warn("Processor cannot be enabled because it is not 
disabled");
+        }
+    }
+
+    /**
+     * Will set the state of the processor to DISABLED which essentially 
implies
+     * that this processor can NOT be started. This is idempotent operation and
+     * will result in the WARN message if processor can not be enabled.
+     */
+    public void disable() {
+        if (!this.scheduledState.compareAndSet(ScheduledState.STOPPED, 
ScheduledState.DISABLED)) {
+            logger.warn("Processor cannot be disabled because its state is set 
to " + this.scheduledState);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
index 1ee85a2..e9f958c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -255,7 +255,7 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(element, "comments", port.getComments());
         addTextElement(element, "scheduledState", 
port.getScheduledState().name());
         addTextElement(element, "maxConcurrentTasks", 
port.getMaxConcurrentTasks());
-        addTextElement(element, "useCompression", 
String.valueOf(((RemoteGroupPort) port).isUseCompression()));
+        addTextElement(element, "useCompression", 
String.valueOf(port.isUseCompression()));
 
         parentElement.appendChild(element);
     }
@@ -311,7 +311,7 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
         addTextElement(element, "bulletinLevel", 
processor.getBulletinLevel().toString());
         addTextElement(element, "lossTolerant", 
String.valueOf(processor.isLossTolerant()));
-        addTextElement(element, "scheduledState", 
processor.getScheduledState().name());
+        addTextElement(element, "scheduledState", 
processor.getLogicalScheduledState().name());
         addTextElement(element, "schedulingStrategy", 
processor.getSchedulingStrategy().name());
         addTextElement(element, "runDurationNanos", 
processor.getRunDuration(TimeUnit.NANOSECONDS));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index dac56b5..2a0819c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -106,7 +106,6 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     private final AtomicReference<List<Connection>> incomingConnectionsRef;
     private final AtomicBoolean isolated;
     private final AtomicBoolean lossTolerant;
-    private final AtomicReference<ScheduledState> scheduledState;
     private final AtomicReference<String> comments;
     private final AtomicReference<Position> position;
     private final AtomicReference<String> annotationData;
@@ -145,7 +144,6 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         destinations = new HashMap<>();
         connections = new HashMap<>();
         incomingConnectionsRef = new AtomicReference<List<Connection>>(new 
ArrayList<Connection>());
-        scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
         lossTolerant = new AtomicBoolean(false);
         final Set<Relationship> emptySetOfRelationships = new HashSet<>();
         undefinedRelationshipsToTerminate = new 
AtomicReference<>(emptySetOfRelationships);
@@ -214,11 +212,6 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-    public ScheduledState getScheduledState() {
-        return this.scheduledState.get();
-    }
-
-    @Override
     public Position getPosition() {
         return position.get();
     }
@@ -1276,7 +1269,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
             };
             taskScheduler.execute(startProcRunnable);
         } else {
-            LOG.warn("Can not start Processor since it's already in the 
process of being started");
+            LOG.warn("Can not start Processor since it's already in the 
process of being started or it is DISABLED");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 84d667f..bbaa23b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -473,16 +473,12 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     @Override
     public synchronized void enableProcessor(final ProcessorNode procNode) {
-        if (procNode.getScheduledState() != ScheduledState.DISABLED) {
-            throw new IllegalStateException("Processor cannot be enabled 
because it is not disabled");
-        }
+        procNode.enable();
     }
 
     @Override
     public synchronized void disableProcessor(final ProcessorNode procNode) {
-        if (procNode.getScheduledState() != ScheduledState.STOPPED) {
-            throw new IllegalStateException("Processor cannot be disabled 
because its state is set to " + procNode.getScheduledState());
-        }
+        procNode.disable();
     }
 
     public synchronized void enableReportingTask(final ReportingTaskNode 
taskNode) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index 2cce14d..d78009c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -89,6 +89,53 @@ public class TestProcessorLifecycle {
         FileUtils.deleteDirectory(new File("./target/content_repository"));
     }
 
+    @Test
+    public void validateEnableOperation() throws Exception {
+        FlowController fc = this.buildFlowControllerForTest();
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
+                UUID.randomUUID().toString());
+
+        assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
+        assertEquals(ScheduledState.STOPPED, 
testProcNode.getLogicalScheduledState());
+        // validates idempotency
+        for (int i = 0; i < 2; i++) {
+            testProcNode.enable();
+        }
+        assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
+        assertEquals(ScheduledState.STOPPED, 
testProcNode.getLogicalScheduledState());
+        testProcNode.disable();
+        assertEquals(ScheduledState.DISABLED, 
testProcNode.getScheduledState());
+        assertEquals(ScheduledState.DISABLED, 
testProcNode.getLogicalScheduledState());
+        fc.shutdown(true);
+    }
+
+
+    @Test
+    public void validateDisableOperation() throws Exception {
+        FlowController fc = this.buildFlowControllerForTest();
+        ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
+        this.setControllerRootGroup(fc, testGroup);
+        final ProcessorNode testProcNode = 
fc.createProcessor(TestProcessor.class.getName(),
+                UUID.randomUUID().toString());
+        testProcNode.setProperty("P", "hello");
+        assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
+        assertEquals(ScheduledState.STOPPED, 
testProcNode.getLogicalScheduledState());
+        // validates idempotency
+        for (int i = 0; i < 2; i++) {
+            testProcNode.disable();
+        }
+        assertEquals(ScheduledState.DISABLED, 
testProcNode.getScheduledState());
+        assertEquals(ScheduledState.DISABLED, 
testProcNode.getLogicalScheduledState());
+
+        ProcessScheduler ps = fc.getProcessScheduler();
+        ps.startProcessor(testProcNode);
+        assertEquals(ScheduledState.DISABLED, 
testProcNode.getLogicalScheduledState());
+
+        fc.shutdown(true);
+    }
+
     /**
      * Will validate the idempotent nature of processor start operation which
      * can be called multiple times without any side-effects.
@@ -276,10 +323,12 @@ public class TestProcessorLifecycle {
         ps.startProcessor(testProcNode);
         Thread.sleep(1000);
         assertTrue(testProcNode.getScheduledState() == 
ScheduledState.STARTING);
+        assertTrue(testProcNode.getLogicalScheduledState() == 
ScheduledState.RUNNING);
 
         ps.stopProcessor(testProcNode);
         Thread.sleep(100);
         assertTrue(testProcNode.getScheduledState() == 
ScheduledState.STOPPING);
+        assertTrue(testProcNode.getLogicalScheduledState() == 
ScheduledState.STOPPED);
         Thread.sleep(1000);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
 
@@ -360,7 +409,7 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() 
throws Exception {
-        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT,
 "5000");
+        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT,
 "5 sec");
         FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
@@ -390,7 +439,7 @@ public class TestProcessorLifecycle {
      */
     @Test
     public void 
validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() 
throws Exception {
-        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT,
 "5000");
+        
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_START_TIMEOUT,
 "5 sec");
         FlowController fc = this.buildFlowControllerForTest();
         ProcessGroup testGroup = 
fc.createProcessGroup(UUID.randomUUID().toString());
         this.setControllerRootGroup(fc, testGroup);
@@ -406,9 +455,14 @@ public class TestProcessorLifecycle {
         assertTrue(testProcNode.getScheduledState() == 
ScheduledState.STARTING);
         Thread.sleep(1000);
         assertTrue(testProcNode.getScheduledState() == 
ScheduledState.STARTING);
+        ps.disableProcessor(testProcNode); // no effect
+        Thread.sleep(100);
+        assertTrue(testProcNode.getScheduledState() == 
ScheduledState.STARTING);
+        assertTrue(testProcNode.getLogicalScheduledState() == 
ScheduledState.RUNNING);
         ps.stopProcessor(testProcNode);
         Thread.sleep(100);
         assertTrue(testProcNode.getScheduledState() == 
ScheduledState.STOPPING);
+        assertTrue(testProcNode.getLogicalScheduledState() == 
ScheduledState.STOPPED);
         Thread.sleep(4000);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
         fc.shutdown(true);
@@ -435,6 +489,9 @@ public class TestProcessorLifecycle {
         ps.startProcessor(testProcNode);
         Thread.sleep(1000);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
+        ps.disableProcessor(testProcNode);
+        Thread.sleep(100);
+        assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
         ps.stopProcessor(testProcNode);
         Thread.sleep(500);
         assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);

http://git-wip-us.apache.org/repos/asf/nifi/blob/48af0bfb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 652fd04..64ffdaf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,29 @@
  */
 package org.apache.nifi.web.api.dto;
 
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.component.details.ComponentDetails;
@@ -128,28 +151,6 @@ import 
org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusDTO;
 
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-
 public final class DtoFactory {
 
     @SuppressWarnings("rawtypes")
@@ -1643,7 +1644,7 @@ public final class DtoFactory {
 
         dto.setType(node.getProcessor().getClass().getCanonicalName());
         dto.setName(node.getName());
-        dto.setState(node.getScheduledState().toString());
+        dto.setState(node.getLogicalScheduledState().toString());
 
         // build the relationship dtos
         final List<RelationshipDTO> relationships = new ArrayList<>();

Reply via email to