NIFI-1464 addressed PR comments from @apiri and @markap14
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f53f45de Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f53f45de Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f53f45de Branch: refs/heads/master Commit: f53f45def3ed353232ec65d656bba4eee922b252 Parents: 0c5b1c2 Author: Oleg Zhurakousky <[email protected]> Authored: Wed Feb 17 09:10:15 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Mar 11 12:54:50 2016 -0500 ---------------------------------------------------------------------- .../apache/nifi/controller/ProcessorNode.java | 4 +- .../nifi/controller/StandardProcessorNode.java | 6 +- .../scheduling/AbstractSchedulingAgent.java | 2 +- .../StandardControllerServiceProvider.java | 1 - .../TestStandardControllerServiceProvider.java | 122 ------------------- 5 files changed, 7 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index d7dbe24..ff7977d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -102,7 +102,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen /** * Will start the {@link Processor} represented by this - * {@link ProcessorNode}. Starting processor typically means invoking it's + * {@link ProcessorNode}. Starting processor typically means invoking its * operation that is annotated with @OnScheduled and then executing a * callback provided by the {@link ProcessScheduler} to which typically * initiates @@ -126,7 +126,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen /** * Will stop the {@link Processor} represented by this {@link ProcessorNode}. - * Stopping processor typically means invoking it's operation that is + * Stopping processor typically means invoking its operation that is * annotated with @OnUnschedule and then @OnStopped. * * @param scheduler http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 3e07995..dac56b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1374,8 +1374,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } }); - long onScheduleTimeout = Long.parseLong(NiFiProperties.getInstance() - .getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT, String.valueOf(Long.MAX_VALUE))); + String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_START_TIMEOUT); + long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE + : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); + try { executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java index b931c64..3544dac 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java @@ -23,7 +23,7 @@ import org.apache.nifi.controller.ReportingTaskNode; * Base implementation of the {@link SchedulingAgent} which encapsulates the * updates to the {@link ScheduleState} based on invoked operation and then * delegates to the corresponding 'do' methods. For example; By invoking - * {@link #schedule(Connectable, ScheduleState)} the the + * {@link #schedule(Connectable, ScheduleState)} the * {@link ScheduleState#setScheduled(boolean)} with value 'true' will be * invoked. * http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 3b9b073..77dc87e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -457,7 +457,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { final Set<String> identifiers = new HashSet<>(); for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) { - Class<? extends ControllerService> c = entry.getValue().getProxiedControllerService().getClass(); if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) { identifiers.add(entry.getKey()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/f53f45de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 9b35238..11b73a8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -17,7 +17,6 @@ package org.apache.nifi.controller.service; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.beans.PropertyDescriptor; @@ -41,15 +40,11 @@ import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.StandardProcessGroup; -import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class TestStandardControllerServiceProvider { private static StateManagerProvider stateManagerProvider = new StateManagerProvider() { @@ -191,119 +186,6 @@ public class TestStandardControllerServiceProvider { } } - @Test(timeout = 10000) - @Ignore // this may be obsolete since TestProcessorLifecycle covers this - // scenario without mocks - public void testStartStopReferencingComponents() { - final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); - - // build a graph of reporting tasks and controller services with dependencies as such: - // - // Processor P1 -> A -> B -> D - // Processor P2 -> C ---^----^ - // - // In other words, Processor P1 references Controller Service A, which references B, which references D. - // AND - // Processor P2 references Controller Service C, which references B and D. - // - // So we have to verify that if D is enabled, when we enable its referencing services, - // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so - // until B is first enabled so ensure that we enable B first. - final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); - final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); - final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); - final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); - - final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class); - Mockito.doAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0]; - procNode.verifyCanStart(); - // procNode.setScheduledState(ScheduledState.RUNNING); - return null; - } - }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class)); - - Mockito.doAnswer(new Answer<Object>() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - final ProcessorNode procNode = (ProcessorNode) invocation.getArguments()[0]; - procNode.verifyCanStop(); - // procNode.setScheduledState(ScheduledState.STOPPED); - return null; - } - }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class)); - - final String id1 = UUID.randomUUID().toString(); - final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1, - new StandardValidationContextFactory(provider), scheduler, provider); - procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider)); - procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1"); - procNodeA.setProcessGroup(mockProcessGroup); - - final String id2 = UUID.randomUUID().toString(); - final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2, - new StandardValidationContextFactory(provider), scheduler, provider); - procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider)); - procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3"); - procNodeB.setProcessGroup(mockProcessGroup); - - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); - - provider.enableControllerService(serviceNode4); - provider.enableReferencingServices(serviceNode4); - provider.scheduleReferencingComponents(serviceNode4); - - final Set<ControllerServiceState> enableStates = new HashSet<>(); - enableStates.add(ControllerServiceState.ENABLED); - enableStates.add(ControllerServiceState.ENABLING); - - while (serviceNode3.getState() != ControllerServiceState.ENABLED - || serviceNode2.getState() != ControllerServiceState.ENABLED - || serviceNode1.getState() != ControllerServiceState.ENABLED) { - assertTrue(enableStates.contains(serviceNode3.getState())); - assertTrue(enableStates.contains(serviceNode2.getState())); - assertTrue(enableStates.contains(serviceNode1.getState())); - } - assertTrue(procNodeA.isRunning()); - assertTrue(procNodeB.isRunning()); - - // stop processors and verify results. - provider.unscheduleReferencingComponents(serviceNode4); - assertFalse(procNodeA.isRunning()); - assertFalse(procNodeB.isRunning()); - while (serviceNode3.getState() != ControllerServiceState.ENABLED - || serviceNode2.getState() != ControllerServiceState.ENABLED - || serviceNode1.getState() != ControllerServiceState.ENABLED) { - assertTrue(enableStates.contains(serviceNode3.getState())); - assertTrue(enableStates.contains(serviceNode2.getState())); - assertTrue(enableStates.contains(serviceNode1.getState())); - } - - provider.disableReferencingServices(serviceNode4); - final Set<ControllerServiceState> disableStates = new HashSet<>(); - disableStates.add(ControllerServiceState.DISABLED); - disableStates.add(ControllerServiceState.DISABLING); - - // Wait for the services to be disabled. - while (serviceNode3.getState() != ControllerServiceState.DISABLED - || serviceNode2.getState() != ControllerServiceState.DISABLED - || serviceNode1.getState() != ControllerServiceState.DISABLED) { - assertTrue(disableStates.contains(serviceNode3.getState())); - assertTrue(disableStates.contains(serviceNode2.getState())); - assertTrue(disableStates.contains(serviceNode1.getState())); - } - - assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState()); - - provider.disableControllerService(serviceNode4); - assertTrue(disableStates.contains(serviceNode4.getState())); - } @Test public void testOrderingOfServices() { @@ -476,9 +358,5 @@ public class TestStandardControllerServiceProvider { // procNode.setScheduledState(ScheduledState.RUNNING); provider.unscheduleReferencingComponents(serviceNode); assertEquals(ScheduledState.STOPPED, procNode.getScheduledState()); - - // procNode.setScheduledState(ScheduledState.DISABLED); - // provider.unscheduleReferencingComponents(serviceNode); - // assertEquals(ScheduledState.DISABLED, procNode.getScheduledState()); } }
