NIFI-4: Added lifecycle annotation support

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d8e1f570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d8e1f570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d8e1f570

Branch: refs/heads/annotations
Commit: d8e1f570a68df152f1d29d60acf732a0f6b532ec
Parents: d734220
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jan 16 15:52:47 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri Jan 16 15:52:47 2015 -0500

----------------------------------------------------------------------
 .../service/ControllerServiceProvider.java      | 21 +++--
 .../apache/nifi/controller/FlowController.java  | 57 ++++++++++++-
 .../scheduling/StandardProcessScheduler.java    | 88 ++++++++++++++++++--
 .../StandardControllerServiceProvider.java      | 43 +++++++---
 .../processor/StandardSchedulingContext.java    |  4 +-
 .../org/apache/nifi/util/ReflectionUtils.java   | 62 ++++++++++++--
 6 files changed, 238 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 35a255d..03ed779 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -16,8 +16,7 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.util.Map;
-
+import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 /**
@@ -26,15 +25,15 @@ import org.apache.nifi.controller.ControllerServiceLookup;
 public interface ControllerServiceProvider extends ControllerServiceLookup {
 
     /**
-     * Gets the controller service for the specified identifier. Returns null 
if
-     * the identifier does not match a known service.
+     * Creates a new Controller Service of the given type and assigns it the 
given id. If <code>firstTimeadded</code>
+     * is true, calls any methods that are annotated with {@link OnAdded}
      *
      * @param type
      * @param id
-     * @param properties
+     * @param firstTimeAdded
      * @return
      */
-    ControllerServiceNode createControllerService(String type, String id, 
Map<String, String> properties);
+    ControllerServiceNode createControllerService(String type, String id, 
boolean firstTimeAdded);
 
     /**
      * Gets the controller service node for the specified identifier. Returns
@@ -44,4 +43,14 @@ public interface ControllerServiceProvider extends 
ControllerServiceLookup {
      * @return
      */
     ControllerServiceNode getControllerServiceNode(String id);
+    
+    /**
+     * Removes the given Controller Service from the flow. This will call all 
appropriate methods
+     * that have the @OnRemoved annotation.
+     * 
+     * @param serviceNode the controller service to remove
+     * 
+     * @throws IllegalStateException if the controller service is not disabled 
or is not a part of this flow
+     */
+    void removeControllerService(ControllerServiceNode serviceNode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
index 860ea2d..1d90a3a 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -50,6 +50,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.admin.service.UserService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -134,6 +135,7 @@ import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.logging.ProcessorLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoader;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.processor.Processor;
@@ -2463,6 +2465,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
     }
 
     public ReportingTaskNode createReportingTask(final String type, String id) 
throws ReportingTaskInstantiationException {
+        return createReportingTask(type, id, true);
+    }
+    
+    public ReportingTaskNode createReportingTask(final String type, String id, 
final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
         if (type == null) {
             throw new NullPointerException();
         }
@@ -2484,7 +2490,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
             final Class<? extends ReportingTask> reportingTaskClass = 
rawClass.asSubclass(ReportingTask.class);
             final Object reportingTaskObj = reportingTaskClass.newInstance();
             task = reportingTaskClass.cast(reportingTaskObj);
-
         } catch (final ClassNotFoundException | SecurityException | 
InstantiationException | IllegalAccessException | IllegalArgumentException t) {
             throw new ReportingTaskInstantiationException(type, t);
         } finally {
@@ -2495,6 +2500,15 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
 
         final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(controllerServiceProvider);
         final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, 
id, this, processScheduler, validationContextFactory);
+        
+        if ( firstTimeAdded ) {
+            try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
task);
+            } catch (final Exception e) {
+                throw new ProcessorLifeCycleException("Failed to invoke 
On-Added Lifecycle methods of " + task, e);
+            }
+        }
+        
         reportingTasks.put(id, taskNode);
         return taskNode;
     }
@@ -2519,13 +2533,45 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
         processScheduler.unschedule(reportingTaskNode);
     }
 
+    public void removeReportingTask(final ReportingTaskNode reportingTaskNode) 
{
+        final ReportingTaskNode existing = 
reportingTasks.get(reportingTaskNode.getIdentifier());
+        if ( existing == null || existing != reportingTaskNode ) {
+            throw new IllegalStateException("Reporting Task " + 
reportingTaskNode + " does not exist in this Flow");
+        }
+        
+        reportingTaskNode.verifyCanDelete();
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
reportingTaskNode.getReportingTask(), 
reportingTaskNode.getConfigurationContext());
+        }
+        
+        reportingTasks.remove(reportingTaskNode.getIdentifier());
+    }
+    
     Collection<ReportingTaskNode> getReportingTasks() {
         return reportingTasks.values();
     }
 
+
+    public void enableReportingTask(final ReportingTaskNode reportingTaskNode) 
{
+        processScheduler.enableReportingTask(reportingTaskNode);
+    }
+    
+    public void disableReportingTask(final ReportingTaskNode 
reportingTaskNode) {
+        processScheduler.disableReportingTask(reportingTaskNode);
+    }
+    
+    public void enableControllerService(final ControllerServiceNode 
serviceNode) {
+        processScheduler.enableControllerService(serviceNode);
+    }
+    
+    public void disableControllerService(final ControllerServiceNode 
serviceNode) {
+        processScheduler.disableControllerService(serviceNode);
+    }
+
     @Override
-    public ControllerServiceNode createControllerService(final String type, 
final String id, final Map<String, String> properties) {
-        return controllerServiceProvider.createControllerService(type, 
id.intern(), properties);
+    public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
+        return controllerServiceProvider.createControllerService(type, 
id.intern(), firstTimeAdded);
     }
 
     @Override
@@ -2548,6 +2594,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
         return 
controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
     }
 
+    @Override
+    public void removeControllerService(final ControllerServiceNode 
serviceNode) {
+        controllerServiceProvider.removeControllerService(serviceNode);
+    }
+    
     //
     // Counters
     //

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index e565ebc..0653b03 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -27,6 +27,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -41,6 +43,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
@@ -514,14 +517,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     }
 
     @Override
-    public synchronized void disableProcessor(final ProcessorNode procNode) {
-        if (procNode.getScheduledState() != ScheduledState.STOPPED) {
-            throw new IllegalStateException("Processor cannot be disabled 
because its state is set to " + procNode.getScheduledState());
-        }
-        procNode.setScheduledState(ScheduledState.DISABLED);
-    }
-
-    @Override
     public synchronized void enablePort(final Port port) {
         if (port.getScheduledState() != ScheduledState.DISABLED) {
             throw new IllegalStateException("Funnel cannot be enabled because 
it is not disabled");
@@ -539,9 +534,84 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         if (procNode.getScheduledState() != ScheduledState.DISABLED) {
             throw new IllegalStateException("Processor cannot be enabled 
because it is not disabled");
         }
+        
         procNode.setScheduledState(ScheduledState.STOPPED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ProcessorLog processorLog = new 
SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, 
procNode.getProcessor(), processorLog);
+        }
+    }
+
+    @Override
+    public synchronized void disableProcessor(final ProcessorNode procNode) {
+        if (procNode.getScheduledState() != ScheduledState.STOPPED) {
+            throw new IllegalStateException("Processor cannot be disabled 
because its state is set to " + procNode.getScheduledState());
+        }
+        
+        procNode.setScheduledState(ScheduledState.DISABLED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ProcessorLog processorLog = new 
SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, 
procNode.getProcessor(), processorLog);
+        }
     }
 
+    public synchronized void enableReportingTask(final ReportingTaskNode 
taskNode) {
+        if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+            throw new IllegalStateException("Reporting Task cannot be enabled 
because it is not disabled");
+        }
+
+        taskNode.setScheduledState(ScheduledState.STOPPED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, 
taskNode.getReportingTask());
+        }
+    }
+    
+    public synchronized void disableReportingTask(final ReportingTaskNode 
taskNode) {
+        if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
+            throw new IllegalStateException("Reporting Task cannot be disabled 
because its state is set to " + taskNode.getScheduledState() + " but transition 
to DISABLED state is allowed only from the STOPPED state");
+        }
+
+        taskNode.setScheduledState(ScheduledState.DISABLED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, 
taskNode.getReportingTask());
+        }
+    }
+
+    public synchronized void enableControllerService(final 
ControllerServiceNode serviceNode) {
+        if ( !serviceNode.isDisabled() ) {
+            throw new IllegalStateException("Controller Service cannot be 
enabled because it is not disabled");
+        }
+
+        // we set the service to enabled before invoking the @OnEnabled 
methods. We do this because it must be
+        // done in this order for disabling (serviceNode.setDisabled(true) 
will throw Exceptions if the service
+        // is currently known to be in use) and we want to be consistent with 
the ordering of calling setDisabled
+        // before annotated methods.
+        serviceNode.setDisabled(false);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, 
serviceNode.getControllerServiceImplementation());
+        }
+    }
+    
+    public synchronized void disableControllerService(final 
ControllerServiceNode serviceNode) {
+        if ( serviceNode.isDisabled() ) {
+            throw new IllegalStateException("Controller Service cannot be 
disabled because it is already disabled");
+        }
+
+        // We must set the service to disabled before we invoke the OnDisabled 
methods because the service node
+        // can throw Exceptions if we attempt to disable the service while 
it's known to be in use.
+        serviceNode.setDisabled(true);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, 
serviceNode.getControllerServiceImplementation());
+        }
+    }
+    
+    
     @Override
     public boolean isScheduled(final Object scheduled) {
         final ScheduleState scheduleState = scheduleStates.get(scheduled);
@@ -549,7 +619,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     }
 
     /**
-     * Returns the ScheduleState that is registered for the given 
ProcessorNode;
+     * Returns the ScheduleState that is registered for the given component;
      * if no ScheduleState current is registered, one is created and registered
      * atomically, and then that value is returned.
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index fc07ce1..bf0039a 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -30,17 +30,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.annotation.OnConfigured;
 import 
org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
 import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.ReflectionUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,7 +96,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, 
final String id, final Map<String, String> properties) {
+    public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
         if (type == null || id == null) {
             throw new NullPointerException();
         }
@@ -139,15 +142,18 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
 
             final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(this);
 
-            final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(proxiedService, id, validationContextFactory, 
this);
+            final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(proxiedService, originalService, id, 
validationContextFactory, this);
             serviceNodeHolder.set(serviceNode);
             serviceNode.setAnnotationData(null);
             serviceNode.setName(id);
-            for (final Map.Entry<String, String> entry : 
properties.entrySet()) {
-                serviceNode.setProperty(entry.getKey(), entry.getValue());
+            
+            if ( firstTimeAdded ) {
+                try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                    ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
originalService);
+                } catch (final Exception e) {
+                    throw new ProcessorLifeCycleException("Failed to invoke 
On-Added Lifecycle methods of " + originalService, e);
+                }
             }
-            final StandardConfigurationContext configurationContext = new 
StandardConfigurationContext(serviceNode, this);
-            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigured.class, 
originalService, configurationContext);
 
             this.controllerServices.put(id, serviceNode);
             return serviceNode;
@@ -163,7 +169,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
     @Override
     public ControllerService getControllerService(final String 
serviceIdentifier) {
         final ControllerServiceNode node = 
controllerServices.get(serviceIdentifier);
-        return (node == null) ? null : node.getControllerService();
+        return (node == null) ? null : node.getProxiedControllerService();
     }
 
     @Override
@@ -186,11 +192,28 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
     public Set<String> getControllerServiceIdentifiers(final Class<? extends 
ControllerService> serviceType) {
         final Set<String> identifiers = new HashSet<>();
         for (final Map.Entry<String, ControllerServiceNode> entry : 
controllerServices.entrySet()) {
-            if 
(requireNonNull(serviceType).isAssignableFrom(entry.getValue().getControllerService().getClass()))
 {
+            if 
(requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass()))
 {
                 identifiers.add(entry.getKey());
             }
         }
 
         return identifiers;
     }
+    
+    @Override
+    public void removeControllerService(final ControllerServiceNode 
serviceNode) {
+        final ControllerServiceNode existing = 
controllerServices.get(serviceNode.getIdentifier());
+        if ( existing == null || existing != serviceNode ) {
+            throw new IllegalStateException("Controller Service " + 
serviceNode + " does not exist in this Flow");
+        }
+        
+        serviceNode.verifyCanDelete();
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ConfigurationContext configurationContext = new 
StandardConfigurationContext(serviceNode, this);
+            
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
serviceNode.getControllerServiceImplementation(), configurationContext);
+        }
+        
+        controllerServices.remove(serviceNode.getIdentifier());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 318901f..ac58504 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -46,11 +46,11 @@ public class StandardSchedulingContext implements 
SchedulingContext {
         }
 
         if (serviceNode.isDisabled()) {
-            throw new IllegalStateException("Cannot lease Controller Service 
because Controller Service " + serviceNode.getControllerService() + " is 
currently disabled");
+            throw new IllegalStateException("Cannot lease Controller Service 
because Controller Service " + serviceNode.getProxiedControllerService() + " is 
currently disabled");
         }
 
         if (!serviceNode.isValid()) {
-            throw new IllegalStateException("Cannot lease Controller Service 
because Controller Service " + serviceNode.getControllerService() + " is not 
currently valid");
+            throw new IllegalStateException("Cannot lease Controller Service 
because Controller Service " + serviceNode.getProxiedControllerService() + " is 
not currently valid");
         }
 
         serviceNode.addReference(processorNode);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
index f8e7da4..a8a4596 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.nifi.logging.ProcessorLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,7 +149,28 @@ public class ReflectionUtils {
      * is returned, an error will have been logged.
      */
     public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? 
extends Annotation> annotation, final Object instance, final Object... args) {
-        return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, 
args);
+        return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, 
null, args);
+    }
+    
+    
+    /**
+     * Invokes all methods on the given instance that have been annotated with
+     * the given Annotation. If the signature of the method that is defined in
+     * <code>instance</code> uses 1 or more parameters, those parameters must 
be
+     * specified by the <code>args</code> parameter. However, if more arguments
+     * are supplied by the <code>args</code> parameter than needed, the extra
+     * arguments will be ignored.
+     *
+     * @param annotation
+     * @param instance
+     * @param args
+     * @return <code>true</code> if all appropriate methods were invoked and
+     * returned without throwing an Exception, <code>false</code> if one of the
+     * methods threw an Exception or could not be invoked; if 
<code>false</code>
+     * is returned, an error will have been logged.
+     */
+    public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? 
extends Annotation> annotation, final Object instance, final ProcessorLog 
logger, final Object... args) {
+        return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, 
logger, args);
     }
     
     
@@ -165,13 +187,15 @@ public class ReflectionUtils {
      * @param preferredAnnotation
      * @param alternateAnnotation
      * @param instance
+     * @param logger the ProcessorLog to use for logging any errors. If null, 
will use own logger, but that will not generate bulletins
+     *          or easily tie to the Processor's log messages.
      * @param args
      * @return <code>true</code> if all appropriate methods were invoked and
      * returned without throwing an Exception, <code>false</code> if one of the
      * methods threw an Exception or could not be invoked; if 
<code>false</code>
      * is returned, an error will have been logged.
      */
-    public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? 
extends Annotation> preferredAnnotation, final Class<? extends Annotation> 
alternateAnnotation, final Object instance, final Object... args) {
+    public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? 
extends Annotation> preferredAnnotation, final Class<? extends Annotation> 
alternateAnnotation, final Object instance, final ProcessorLog logger, final 
Object... args) {
         final List<Class<? extends Annotation>> annotationClasses = new 
ArrayList<>(alternateAnnotation == null ? 1 : 2);
         annotationClasses.add(preferredAnnotation);
         if ( alternateAnnotation != null ) {
@@ -194,16 +218,28 @@ public class ReflectionUtils {
                     try {
                         final Class<?>[] argumentTypes = 
method.getParameterTypes();
                         if (argumentTypes.length > args.length) {
-                            LOG.error("Unable to invoke method {} on {} 
because method expects {} parameters but only {} were given",
+                            if ( logger == null ) {
+                                LOG.error("Unable to invoke method {} on {} 
because method expects {} parameters but only {} were given",
                                     new Object[]{method.getName(), instance, 
argumentTypes.length, args.length});
+                            } else {
+                                logger.error("Unable to invoke method {} on {} 
because method expects {} parameters but only {} were given",
+                                        new Object[]{method.getName(), 
instance, argumentTypes.length, args.length});
+                            }
+                            
                             return false;
                         }
     
                         for (int i = 0; i < argumentTypes.length; i++) {
                             final Class<?> argType = argumentTypes[i];
                             if (!argType.isAssignableFrom(args[i].getClass())) 
{
-                                LOG.error("Unable to invoke method {} on {} 
because method parameter {} is expected to be of type {} but argument passed 
was of type {}",
+                                if ( logger == null ) {
+                                    LOG.error("Unable to invoke method {} on 
{} because method parameter {} is expected to be of type {} but argument passed 
was of type {}",
                                         new Object[]{method.getName(), 
instance, i, argType, args[i].getClass()});
+                                } else {
+                                    logger.error("Unable to invoke method {} 
on {} because method parameter {} is expected to be of type {} but argument 
passed was of type {}",
+                                            new Object[]{method.getName(), 
instance, i, argType, args[i].getClass()});
+                                }
+                                
                                 return false;
                             }
                         }
@@ -219,9 +255,21 @@ public class ReflectionUtils {
     
                                 method.invoke(instance, argsToPass);
                             }
-                        } catch (final IllegalAccessException | 
IllegalArgumentException | InvocationTargetException t) {
-                            LOG.error("Unable to invoke method {} on {} due to 
{}", new Object[]{method.getName(), instance, t});
-                            LOG.error("", t);
+                        } catch (final InvocationTargetException ite) {
+                            if ( logger == null ) {
+                                LOG.error("Unable to invoke method {} on {} 
due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+                                LOG.error("", ite.getCause());
+                            } else {
+                                logger.error("Unable to invoke method {} on {} 
due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+                            }
+                        } catch (final IllegalAccessException | 
IllegalArgumentException t) {
+                            if ( logger == null ) {
+                                LOG.error("Unable to invoke method {} on {} 
due to {}", new Object[]{method.getName(), instance, t});
+                                LOG.error("", t);
+                            } else {
+                                logger.error("Unable to invoke method {} on {} 
due to {}", new Object[]{method.getName(), instance, t});
+                            }
+                            
                             return false;
                         }
                     } finally {

Reply via email to