http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index d8f1338..a45bf76 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -61,9 +61,6 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class StandardControllerServiceProvider implements 
ControllerServiceProvider {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StandardControllerServiceProvider.class);
@@ -112,24 +109,24 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
             populateInterfaces(superClass, interfacesDefinedThusFar);
         }
     }
-    
+
     @Override
     public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
         if (type == null || id == null) {
             throw new NullPointerException();
         }
-        
+
         final ClassLoader currentContextClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             final ClassLoader cl = ExtensionManager.getClassLoader(type);
             final Class<?> rawClass;
-            if ( cl == null ) {
+            if (cl == null) {
                 rawClass = Class.forName(type);
             } else {
                 Thread.currentThread().setContextClassLoader(cl);
                 rawClass = Class.forName(type, false, cl);
             }
-            
+
             final Class<? extends ControllerService> controllerServiceClass = 
rawClass.asSubclass(ControllerService.class);
 
             final ControllerService originalService = 
controllerServiceClass.newInstance();
@@ -138,11 +135,11 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
                 @Override
                 public Object invoke(final Object proxy, final Method method, 
final Object[] args) throws Throwable {
 
-                       final String methodName = method.getName();
-                       if("initialize".equals(methodName) || 
"onPropertyModified".equals(methodName)){
-                               throw new UnsupportedOperationException(method 
+ " may only be invoked by the NiFi framework");
-                       }
-                       
+                    final String methodName = method.getName();
+                    if ("initialize".equals(methodName) || 
"onPropertyModified".equals(methodName)) {
+                        throw new UnsupportedOperationException(method + " may 
only be invoked by the NiFi framework");
+                    }
+
                     final ControllerServiceNode node = serviceNodeHolder.get();
                     final ControllerServiceState state = node.getState();
                     final boolean disabled = (state != 
ControllerServiceState.ENABLED); // only allow method call if service state is 
ENABLED.
@@ -166,7 +163,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
             };
 
             final ControllerService proxiedService;
-            if ( cl == null ) {
+            if (cl == null) {
                 proxiedService = (ControllerService) 
Proxy.newProxyInstance(getClass().getClassLoader(), 
getInterfaces(controllerServiceClass), invocationHandler);
             } else {
                 proxiedService = (ControllerService) 
Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), 
invocationHandler);
@@ -181,8 +178,8 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
             final ControllerServiceNode serviceNode = new 
StandardControllerServiceNode(proxiedService, originalService, id, 
validationContextFactory, this);
             serviceNodeHolder.set(serviceNode);
             serviceNode.setName(rawClass.getSimpleName());
-            
-            if ( firstTimeAdded ) {
+
+            if (firstTimeAdded) {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
                     ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
originalService);
                 } catch (final Exception e) {
@@ -200,226 +197,227 @@ public class StandardControllerServiceProvider 
implements ControllerServiceProvi
             }
         }
     }
-    
-    
-    
+
     @Override
     public void disableReferencingServices(final ControllerServiceNode 
serviceNode) {
         // Get a list of all Controller Services that need to be disabled, in 
the order that they need to be
         // disabled.
         final List<ControllerServiceNode> toDisable = 
findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
-        
-        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+
+        for (final ControllerServiceNode nodeToDisable : toDisable) {
             final ControllerServiceState state = nodeToDisable.getState();
-            
-            if ( state != ControllerServiceState.DISABLED && state != 
ControllerServiceState.DISABLING ) {
+
+            if (state != ControllerServiceState.DISABLED && state != 
ControllerServiceState.DISABLING) {
                 nodeToDisable.verifyCanDisable(serviceSet);
             }
         }
-        
+
         Collections.reverse(toDisable);
-        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+        for (final ControllerServiceNode nodeToDisable : toDisable) {
             final ControllerServiceState state = nodeToDisable.getState();
-            
-            if ( state != ControllerServiceState.DISABLED && state != 
ControllerServiceState.DISABLING ) {
+
+            if (state != ControllerServiceState.DISABLED && state != 
ControllerServiceState.DISABLING) {
                 disableControllerService(nodeToDisable);
             }
         }
     }
-    
-    
+
     @Override
     public void scheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
         // find all of the schedulable components (processors, reporting 
tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
         final List<ProcessorNode> processors = 
findRecursiveReferences(serviceNode, ProcessorNode.class);
         final List<ReportingTaskNode> reportingTasks = 
findRecursiveReferences(serviceNode, ReportingTaskNode.class);
-        
+
         // verify that  we can start all components (that are not disabled) 
before doing anything
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.verifyCanStart();
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.verifyCanStart();
             }
         }
-        
+
         // start all of the components that are not disabled
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.getProcessGroup().startProcessor(node);
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 processScheduler.schedule(node);
             }
         }
     }
-    
+
     @Override
     public void unscheduleReferencingComponents(final ControllerServiceNode 
serviceNode) {
         // find all of the schedulable components (processors, reporting 
tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
         final List<ProcessorNode> processors = 
findRecursiveReferences(serviceNode, ProcessorNode.class);
         final List<ReportingTaskNode> reportingTasks = 
findRecursiveReferences(serviceNode, ReportingTaskNode.class);
-        
+
         // verify that  we can stop all components (that are running) before 
doing anything
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 node.verifyCanStop();
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 node.verifyCanStop();
             }
         }
-        
+
         // stop all of the components that are running
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 node.getProcessGroup().stopProcessor(node);
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 processScheduler.unschedule(node);
             }
         }
     }
-    
+
     @Override
     public void enableControllerService(final ControllerServiceNode 
serviceNode) {
         serviceNode.verifyCanEnable();
         processScheduler.enableControllerService(serviceNode);
     }
-    
+
     @Override
     public void enableControllerServices(final 
Collection<ControllerServiceNode> serviceNodes) {
         final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
         // Ensure that all nodes are already disabled
-        for ( final ControllerServiceNode serviceNode : serviceNodes ) {
+        for (final ControllerServiceNode serviceNode : serviceNodes) {
             final ControllerServiceState curState = serviceNode.getState();
-            if ( ControllerServiceState.DISABLED.equals(curState) ) {
+            if (ControllerServiceState.DISABLED.equals(curState)) {
                 servicesToEnable.add(serviceNode);
             } else {
                 logger.warn("Cannot enable {} because it is not disabled; 
current state is {}", serviceNode, curState);
             }
         }
-        
+
         // determine the order to load the services. We have to ensure that if 
service A references service B, then B
         // is enabled first, and so on.
         final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
-        for ( final ControllerServiceNode node : servicesToEnable ) {
+        for (final ControllerServiceNode node : servicesToEnable) {
             idToNodeMap.put(node.getIdentifier(), node);
         }
-        
+
         // We can have many Controller Services dependent on one another. We 
can have many of these
         // disparate lists of Controller Services that are dependent on one 
another. We refer to each
         // of these as a branch.
         final List<List<ControllerServiceNode>> branches = 
determineEnablingOrder(idToNodeMap);
 
-        if ( branches.isEmpty() ) {
+        if (branches.isEmpty()) {
             logger.info("No Controller Services to enable");
             return;
         } else {
             logger.info("Will enable {} Controller Services", 
servicesToEnable.size());
         }
-        
+
         // Mark all services that are configured to be enabled as 'ENABLING'. 
This allows Processors, reporting tasks
         // to be valid so that they can be scheduled.
-        for ( final List<ControllerServiceNode> branch : branches ) {
-            for ( final ControllerServiceNode nodeToEnable : branch ) {
+        for (final List<ControllerServiceNode> branch : branches) {
+            for (final ControllerServiceNode nodeToEnable : branch) {
                 nodeToEnable.setState(ControllerServiceState.ENABLING);
             }
         }
-        
+
         final Set<ControllerServiceNode> enabledNodes = 
Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
         final ExecutorService executor = 
Executors.newFixedThreadPool(Math.min(10, branches.size()));
-        for ( final List<ControllerServiceNode> branch : branches ) {
+        for (final List<ControllerServiceNode> branch : branches) {
             final Runnable enableBranchRunnable = new Runnable() {
                 @Override
                 public void run() {
                     logger.debug("Enabling Controller Service Branch {}", 
branch);
-                    
-                    for ( final ControllerServiceNode serviceNode : branch ) {
+
+                    for (final ControllerServiceNode serviceNode : branch) {
                         try {
-                            if ( !enabledNodes.contains(serviceNode) ) {
+                            if (!enabledNodes.contains(serviceNode)) {
                                 enabledNodes.add(serviceNode);
-                                
+
                                 logger.info("Enabling {}", serviceNode);
                                 try {
                                     
processScheduler.enableControllerService(serviceNode);
                                 } catch (final Exception e) {
                                     logger.error("Failed to enable " + 
serviceNode + " due to " + e);
-                                    if ( logger.isDebugEnabled() ) {
+                                    if (logger.isDebugEnabled()) {
                                         logger.error("", e);
                                     }
-                                    
-                                    if ( bulletinRepo != null ) {
+
+                                    if (bulletinRepo != null) {
                                         
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
-                                            "Controller Service", 
Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
+                                                "Controller Service", 
Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
                                     }
                                 }
                             }
-                            
+
                             // wait for service to finish enabling.
-                            while ( 
ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
+                            while 
(ControllerServiceState.ENABLING.equals(serviceNode.getState())) {
                                 try {
                                     Thread.sleep(100L);
-                                } catch (final InterruptedException ie) {}
+                                } catch (final InterruptedException ie) {
+                                }
                             }
-                            
+
                             logger.info("State for {} is now {}", serviceNode, 
serviceNode.getState());
                         } catch (final Exception e) {
                             logger.error("Failed to enable {} due to {}", 
serviceNode, e.toString());
-                            if ( logger.isDebugEnabled() ) {
+                            if (logger.isDebugEnabled()) {
                                 logger.error("", e);
                             }
                         }
                     }
                 }
             };
-            
+
             executor.submit(enableBranchRunnable);
         }
-        
+
         executor.shutdown();
     }
-    
+
     static List<List<ControllerServiceNode>> determineEnablingOrder(final 
Map<String, ControllerServiceNode> serviceNodeMap) {
         final List<List<ControllerServiceNode>> orderedNodeLists = new 
ArrayList<>();
-        
-        for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
-            if ( orderedNodeLists.contains(node) ) {
+
+        for (final ControllerServiceNode node : serviceNodeMap.values()) {
+            if (orderedNodeLists.contains(node)) {
                 continue;   // this node is already in the list.
             }
-            
+
             final List<ControllerServiceNode> branch = new ArrayList<>();
             determineEnablingOrder(serviceNodeMap, node, branch, new 
HashSet<ControllerServiceNode>());
             orderedNodeLists.add(branch);
         }
-        
+
         return orderedNodeLists;
     }
-    
-    
-    private static void determineEnablingOrder(final Map<String, 
ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, 
final List<ControllerServiceNode> orderedNodes, final 
Set<ControllerServiceNode> visited) {
-        if ( visited.contains(contextNode) ) {
+
+    private static void determineEnablingOrder(
+            final Map<String, ControllerServiceNode> serviceNodeMap,
+            final ControllerServiceNode contextNode,
+            final List<ControllerServiceNode> orderedNodes,
+            final Set<ControllerServiceNode> visited) {
+        if (visited.contains(contextNode)) {
             return;
         }
-        
-        for ( final Map.Entry<PropertyDescriptor, String> entry : 
contextNode.getProperties().entrySet() ) {
-            if ( entry.getKey().getControllerServiceDefinition() != null ) {
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
contextNode.getProperties().entrySet()) {
+            if (entry.getKey().getControllerServiceDefinition() != null) {
                 final String referencedServiceId = entry.getValue();
-                if ( referencedServiceId != null ) {
+                if (referencedServiceId != null) {
                     final ControllerServiceNode referencedNode = 
serviceNodeMap.get(referencedServiceId);
-                    if ( !orderedNodes.contains(referencedNode) ) {
+                    if (!orderedNodes.contains(referencedNode)) {
                         visited.add(contextNode);
                         determineEnablingOrder(serviceNodeMap, referencedNode, 
orderedNodes, visited);
                     }
@@ -427,12 +425,11 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
             }
         }
 
-        if ( !orderedNodes.contains(contextNode) ) {
+        if (!orderedNodes.contains(contextNode)) {
             orderedNodes.add(contextNode);
         }
     }
-    
-    
+
     @Override
     public void disableControllerService(final ControllerServiceNode 
serviceNode) {
         serviceNode.verifyCanDisable();
@@ -461,7 +458,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         final ControllerServiceNode node = 
controllerServices.get(serviceIdentifier);
         return (node == null) ? false : (ControllerServiceState.ENABLING == 
node.getState());
     }
-    
+
     @Override
     public ControllerServiceNode getControllerServiceNode(final String 
serviceIdentifier) {
         return controllerServices.get(serviceIdentifier);
@@ -478,157 +475,158 @@ public class StandardControllerServiceProvider 
implements ControllerServiceProvi
 
         return identifiers;
     }
-    
+
     @Override
     public String getControllerServiceName(final String serviceIdentifier) {
-       final ControllerServiceNode node = 
getControllerServiceNode(serviceIdentifier);
-       return node == null ? null : node.getName();
+        final ControllerServiceNode node = 
getControllerServiceNode(serviceIdentifier);
+        return node == null ? null : node.getName();
     }
-    
+
+    @Override
     public void removeControllerService(final ControllerServiceNode 
serviceNode) {
         final ControllerServiceNode existing = 
controllerServices.get(serviceNode.getIdentifier());
-        if ( existing == null || existing != serviceNode ) {
+        if (existing == null || existing != serviceNode) {
             throw new IllegalStateException("Controller Service " + 
serviceNode + " does not exist in this Flow");
         }
-        
+
         serviceNode.verifyCanDelete();
-        
+
         try (final NarCloseable x = NarCloseable.withNarLoader()) {
             final ConfigurationContext configurationContext = new 
StandardConfigurationContext(serviceNode, this);
             
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, 
serviceNode.getControllerServiceImplementation(), configurationContext);
         }
-        
-        for ( final Map.Entry<PropertyDescriptor, String> entry : 
serviceNode.getProperties().entrySet() ) {
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
serviceNode.getProperties().entrySet()) {
             final PropertyDescriptor descriptor = entry.getKey();
-            if (descriptor.getControllerServiceDefinition() != null ) {
+            if (descriptor.getControllerServiceDefinition() != null) {
                 final String value = entry.getValue() == null ? 
descriptor.getDefaultValue() : entry.getValue();
-                if ( value != null ) {
+                if (value != null) {
                     final ControllerServiceNode referencedNode = 
getControllerServiceNode(value);
-                    if ( referencedNode != null ) {
+                    if (referencedNode != null) {
                         referencedNode.removeReference(serviceNode);
                     }
                 }
             }
         }
-        
+
         controllerServices.remove(serviceNode.getIdentifier());
     }
-    
+
     @Override
     public Set<ControllerServiceNode> getAllControllerServices() {
-       return new HashSet<>(controllerServices.values());
+        return new HashSet<>(controllerServices.values());
     }
-    
-    
+
     /**
-     * Returns a List of all components that reference the given 
referencedNode (either directly or indirectly through
-     * another service) that are also of the given componentType. The list 
that is returned is in the order in which they will
-     * need to be 'activated' (enabled/started).
-     * @param referencedNode
-     * @param componentType
-     * @return
+     * Returns a List of all components that reference the given referencedNode
+     * (either directly or indirectly through another service) that are also of
+     * the given componentType. The list that is returned is in the order in
+     * which they will need to be 'activated' (enabled/started).
+     *
+     * @param referencedNode node
+     * @param componentType type
+     * @return list of components
      */
     private <T> List<T> findRecursiveReferences(final ControllerServiceNode 
referencedNode, final Class<T> componentType) {
         final List<T> references = new ArrayList<>();
-        
-        for ( final ConfiguredComponent referencingComponent : 
referencedNode.getReferences().getReferencingComponents() ) {
-            if ( 
componentType.isAssignableFrom(referencingComponent.getClass()) ) {
+
+        for (final ConfiguredComponent referencingComponent : 
referencedNode.getReferences().getReferencingComponents()) {
+            if 
(componentType.isAssignableFrom(referencingComponent.getClass())) {
                 references.add(componentType.cast(referencingComponent));
             }
-            
-            if ( referencingComponent instanceof ControllerServiceNode ) {
+
+            if (referencingComponent instanceof ControllerServiceNode) {
                 final ControllerServiceNode referencingNode = 
(ControllerServiceNode) referencingComponent;
-                
+
                 // find components recursively that depend on referencingNode.
                 final List<T> recursive = 
findRecursiveReferences(referencingNode, componentType);
-                
+
                 // For anything that depends on referencing node, we want to 
add it to the list, but we know
                 // that it must come after the referencing node, so we first 
remove any existing occurrence.
                 references.removeAll(recursive);
                 references.addAll(recursive);
             }
         }
-        
+
         return references;
     }
 
-    
     @Override
     public void enableReferencingServices(final ControllerServiceNode 
serviceNode) {
         final List<ControllerServiceNode> recursiveReferences = 
findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         enableReferencingServices(serviceNode, recursiveReferences);
     }
-    
+
     private void enableReferencingServices(final ControllerServiceNode 
serviceNode, final List<ControllerServiceNode> recursiveReferences) {
-        if ( serviceNode.getState() != ControllerServiceState.ENABLED && 
serviceNode.getState() != ControllerServiceState.ENABLING ) {
+        if (serviceNode.getState() != ControllerServiceState.ENABLED && 
serviceNode.getState() != ControllerServiceState.ENABLING) {
             serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
         }
-        
+
         final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
         final List<ControllerServiceNode> toEnable = 
findRecursiveReferences(serviceNode, ControllerServiceNode.class);
-        for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+        for (final ControllerServiceNode nodeToEnable : toEnable) {
             final ControllerServiceState state = nodeToEnable.getState();
-            if ( state != ControllerServiceState.ENABLED && state != 
ControllerServiceState.ENABLING ) {
+            if (state != ControllerServiceState.ENABLED && state != 
ControllerServiceState.ENABLING) {
                 nodeToEnable.verifyCanEnable(ifEnabled);
                 ifEnabled.add(nodeToEnable);
             }
         }
-        
-        for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+
+        for (final ControllerServiceNode nodeToEnable : toEnable) {
             final ControllerServiceState state = nodeToEnable.getState();
-            if ( state != ControllerServiceState.ENABLED && state != 
ControllerServiceState.ENABLING ) {
+            if (state != ControllerServiceState.ENABLED && state != 
ControllerServiceState.ENABLING) {
                 enableControllerService(nodeToEnable);
             }
         }
     }
-    
+
     @Override
     public void verifyCanEnableReferencingServices(final ControllerServiceNode 
serviceNode) {
         final List<ControllerServiceNode> referencingServices = 
findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final Set<ControllerServiceNode> referencingServiceSet = new 
HashSet<>(referencingServices);
-        
-        for ( final ControllerServiceNode referencingService : 
referencingServices ) {
+
+        for (final ControllerServiceNode referencingService : 
referencingServices) {
             referencingService.verifyCanEnable(referencingServiceSet);
         }
     }
-    
+
     @Override
     public void verifyCanScheduleReferencingComponents(final 
ControllerServiceNode serviceNode) {
         final List<ControllerServiceNode> referencingServices = 
findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final List<ReportingTaskNode> referencingReportingTasks = 
findRecursiveReferences(serviceNode, ReportingTaskNode.class);
         final List<ProcessorNode> referencingProcessors = 
findRecursiveReferences(serviceNode, ProcessorNode.class);
-        
+
         final Set<ControllerServiceNode> referencingServiceSet = new 
HashSet<>(referencingServices);
-        
-        for ( final ReportingTaskNode taskNode : referencingReportingTasks ) {
-            if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+
+        for (final ReportingTaskNode taskNode : referencingReportingTasks) {
+            if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
                 taskNode.verifyCanStart(referencingServiceSet);
             }
         }
-        
-        for ( final ProcessorNode procNode : referencingProcessors ) {
-            if ( procNode.getScheduledState() != ScheduledState.DISABLED ) {
+
+        for (final ProcessorNode procNode : referencingProcessors) {
+            if (procNode.getScheduledState() != ScheduledState.DISABLED) {
                 procNode.verifyCanStart(referencingServiceSet);
             }
         }
     }
-    
+
     @Override
     public void verifyCanDisableReferencingServices(final 
ControllerServiceNode serviceNode) {
         // Get a list of all Controller Services that need to be disabled, in 
the order that they need to be
         // disabled.
         final List<ControllerServiceNode> toDisable = 
findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
-        
-        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+
+        for (final ControllerServiceNode nodeToDisable : toDisable) {
             final ControllerServiceState state = nodeToDisable.getState();
-            
-            if ( state != ControllerServiceState.DISABLED && state != 
ControllerServiceState.DISABLING ) {
+
+            if (state != ControllerServiceState.DISABLED && state != 
ControllerServiceState.DISABLING) {
                 nodeToDisable.verifyCanDisable(serviceSet);
             }
         }
     }
-    
+
     @Override
     public void verifyCanStopReferencingComponents(final ControllerServiceNode 
serviceNode) {
         // we can always stop referencing components

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index c470b99..701adcf 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -65,9 +65,9 @@ public class StandardControllerServiceReference implements 
ControllerServiceRefe
         for (final ConfiguredComponent component : components) {
             if (component instanceof ControllerServiceNode) {
                 serviceNodes.add((ControllerServiceNode) component);
-                
+
                 final ControllerServiceState state = ((ControllerServiceNode) 
component).getState();
-                if ( state != ControllerServiceState.DISABLED ) {
+                if (state != ControllerServiceState.DISABLED) {
                     activeReferences.add(component);
                 }
             } else if (isRunning(component)) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
index 89ac846..6970fce 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
@@ -31,7 +31,8 @@ public class StandardMetricDescriptor<T> implements 
MetricDescriptor<T> {
         this(field, label, description, formatter, valueFunction, null);
     }
 
-    public StandardMetricDescriptor(final String field, final String label, 
final String description, final MetricDescriptor.Formatter formatter, final 
ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) 
{
+    public StandardMetricDescriptor(final String field, final String label, 
final String description,
+            final MetricDescriptor.Formatter formatter, final ValueMapper<T> 
valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
         this.field = field;
         this.label = label;
         this.description = description;
@@ -40,41 +41,21 @@ public class StandardMetricDescriptor<T> implements 
MetricDescriptor<T> {
         this.reducer = reducer == null ? new SumReducer() : reducer;
     }
 
-    /**
-     * The name of this status field.
-     *
-     * @return
-     */
     @Override
     public String getField() {
         return field;
     }
 
-    /**
-     * The label of this status field.
-     *
-     * @return
-     */
     @Override
     public String getLabel() {
         return label;
     }
 
-    /**
-     * The description of this status field.
-     *
-     * @return
-     */
     @Override
     public String getDescription() {
         return description;
     }
 
-    /**
-     * The formatter for this descriptor.
-     *
-     * @return
-     */
     @Override
     public MetricDescriptor.Formatter getFormatter() {
         return formatter;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 0872192..d2a983a 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -91,7 +91,8 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
 
     @Override
     public synchronized void capture(final ProcessGroupStatus rootGroupStatus, 
final Date timestamp) {
-        captures.add(new Capture(timestamp, 
ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, 
ComponentType.PROCESSOR, ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, 
ComponentType.REMOTE_PROCESS_GROUP)));
+        captures.add(new Capture(timestamp, 
ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, 
ComponentType.PROCESSOR,
+                ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, 
ComponentType.REMOTE_PROCESS_GROUP)));
         logger.debug("Captured metrics for {}", this);
         lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime());
     }
@@ -269,48 +270,57 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
 
     public static enum RemoteProcessGroupStatusDescriptor {
 
-        SENT_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 
mins)", "The cumulative size of all FlowFiles that have been successfully sent 
to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return status.getSentContentSize();
-            }
-        })),
-        SENT_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent 
(5 mins)", "The number of FlowFiles that have been successfully sent to the 
remote system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf(status.getSentCount().longValue());
-            }
-        })),
-        RECEIVED_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes 
Received (5 mins)", "The cumulative size of all FlowFiles that have been 
received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, 
new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return status.getReceivedContentSize();
-            }
-        })),
-        RECEIVED_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles 
Received (5 mins)", "The number of FlowFiles that have been received from the 
remote system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf(status.getReceivedCount().longValue());
-            }
-        })),
-        RECEIVED_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", 
"Received Bytes Per Second", "The data rate at which data was received from the 
remote system in the past 5 minutes in terms of Bytes Per Second", 
Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return 
Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
-            }
-        })),
-        SENT_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent 
Bytes Per Second", "The data rate at which data was received from the remote 
system in the past 5 minutes in terms of Bytes Per Second", 
Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf(status.getSentContentSize().longValue() / 
300L);
-            }
-        })),
-        TOTAL_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", 
"Total Bytes Per Second", "The sum of the send and receive data rate from the 
remote system in the past 5 minutes in terms of Bytes Per Second", 
Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return 
Long.valueOf((status.getReceivedContentSize().longValue() + 
status.getSentContentSize().longValue()) / 300L);
-            }
-        })),
+        SENT_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 
mins)",
+                "The cumulative size of all FlowFiles that have been 
successfully sent to the remote system in the past 5 minutes", 
Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus 
status) {
+                        return status.getSentContentSize();
+                    }
+                })),
+        SENT_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent 
(5 mins)",
+                "The number of FlowFiles that have been successfully sent to 
the remote system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus 
status) {
+                        return Long.valueOf(status.getSentCount().longValue());
+                    }
+                })),
+        RECEIVED_BYTES(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes 
Received (5 mins)",
+                "The cumulative size of all FlowFiles that have been received 
from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus 
status) {
+                        return status.getReceivedContentSize();
+                    }
+                })),
+        RECEIVED_COUNT(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles 
Received (5 mins)",
+                "The number of FlowFiles that have been received from the 
remote system in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus 
status) {
+                        return 
Long.valueOf(status.getReceivedCount().longValue());
+                    }
+                })),
+        RECEIVED_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", 
"Received Bytes Per Second",
+                "The data rate at which data was received from the remote 
system in the past 5 minutes in terms of Bytes Per Second",
+                Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus 
status) {
+                        return 
Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
+                    }
+                })),
+        SENT_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent 
Bytes Per Second",
+                "The data rate at which data was received from the remote 
system in the past 5 minutes in terms of Bytes Per Second", 
Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus 
status) {
+                        return 
Long.valueOf(status.getSentContentSize().longValue() / 300L);
+                    }
+                })),
+        TOTAL_BYTES_PER_SECOND(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", 
"Total Bytes Per Second",
+                "The sum of the send and receive data rate from the remote 
system in the past 5 minutes in terms of Bytes Per Second",
+                Formatter.DATA_SIZE, new 
ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus 
status) {
+                        return 
Long.valueOf((status.getReceivedContentSize().longValue() + 
status.getSentContentSize().longValue()) / 300L);
+                    }
+                })),
         AVERAGE_LINEAGE_DURATION(new 
StandardMetricDescriptor<RemoteProcessGroupStatus>(
                 "averageLineageDuration",
                 "Average Lineage Duration (5 mins)",
@@ -358,66 +368,83 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
 
     public static enum ProcessGroupStatusDescriptor {
 
-        BYTES_READ(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 
mins)", "The total number of bytes read from Content Repository by Processors 
in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getBytesRead();
-            }
-        })),
-        BYTES_WRITTEN(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 
mins)", "The total number of bytes written to Content Repository by Processors 
in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getBytesWritten();
-            }
-        })),
-        BYTES_TRANSFERRED(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes 
Transferred (5 mins)", "The total number of bytes read from or written to 
Content Repository by Processors in this Process Group in the past 5 minutes", 
Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getBytesRead() + status.getBytesWritten();
-            }
-        })),
-        INPUT_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", 
"The cumulative size of all FlowFiles that have entered this Process Group via 
its Input Ports in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getInputContentSize();
-            }
-        })),
-        INPUT_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 
mins)", "The number of FlowFiles that have entered this Process Group via its 
Input Ports in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getInputCount().longValue();
-            }
-        })),
-        OUTPUT_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 
mins)", "The cumulative size of all FlowFiles that have exited this Process 
Group via its Output Ports in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getOutputContentSize();
-            }
-        })),
-        OUTPUT_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 
mins)", "The number of FlowFiles that have exited this Process Group via its 
Output Ports in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getOutputCount().longValue();
-            }
-        })),
-        QUEUED_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", 
"The cumulative size of all FlowFiles queued in all Connections of this Process 
Group", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getQueuedContentSize();
-            }
-        })),
-        QUEUED_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", 
"The number of FlowFiles queued in all Connections of this Process Group", 
Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getQueuedCount().longValue();
-            }
-        })),
-        TASK_MILLIS(new 
StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration 
(5 mins)", "The total number of thread-milliseconds that the Processors within 
this ProcessGroup have used to complete their tasks in the past 5 minutes", 
Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return calculateTaskMillis(status);
-            }
-        }));
+        BYTES_READ(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)",
+                "The total number of bytes read from Content Repository by 
Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, 
new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getBytesRead();
+                    }
+                })),
+        BYTES_WRITTEN(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 
mins)",
+                "The total number of bytes written to Content Repository by 
Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, 
new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getBytesWritten();
+                    }
+                })),
+        BYTES_TRANSFERRED(new 
StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes 
Transferred (5 mins)",
+                "The total number of bytes read from or written to Content 
Repository by Processors in this Process Group in the past 5 minutes",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getBytesRead() + 
status.getBytesWritten();
+                    }
+                })),
+        INPUT_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)",
+                "The cumulative size of all FlowFiles that have entered this 
Process Group via its Input Ports in the past 5 minutes",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getInputContentSize();
+                    }
+                })),
+        INPUT_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 
mins)",
+                "The number of FlowFiles that have entered this Process Group 
via its Input Ports in the past 5 minutes",
+                Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getInputCount().longValue();
+                    }
+                })),
+        OUTPUT_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 
mins)",
+                "The cumulative size of all FlowFiles that have exited this 
Process Group via its Output Ports in the past 5 minutes",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getOutputContentSize();
+                    }
+                })),
+        OUTPUT_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 
mins)",
+                "The number of FlowFiles that have exited this Process Group 
via its Output Ports in the past 5 minutes",
+                Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getOutputCount().longValue();
+                    }
+                })),
+        QUEUED_BYTES(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
+                "The cumulative size of all FlowFiles queued in all 
Connections of this Process Group",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getQueuedContentSize();
+                    }
+                })),
+        QUEUED_COUNT(new 
StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
+                "The number of FlowFiles queued in all Connections of this 
Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getQueuedCount().longValue();
+                    }
+                })),
+        TASK_MILLIS(new 
StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration 
(5 mins)",
+                "The total number of thread-milliseconds that the Processors 
within this ProcessGroup have used to complete their tasks in the past 5 
minutes",
+                Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return calculateTaskMillis(status);
+                    }
+                }));
 
         private MetricDescriptor<ProcessGroupStatus> descriptor;
 
@@ -436,42 +463,48 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
 
     public static enum ConnectionStatusDescriptor {
 
-        INPUT_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", 
"The cumulative size of all FlowFiles that were transferred to this Connection 
in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return status.getInputBytes();
-            }
-        })),
-        INPUT_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 
mins)", "The number of FlowFiles that were transferred to this Connection in 
the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return Long.valueOf(status.getInputCount());
-            }
-        })),
-        OUTPUT_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", 
"The cumulative size of all FlowFiles that were pulled from this Connection in 
the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return status.getOutputBytes();
-            }
-        })),
-        OUTPUT_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 
mins)", "The number of FlowFiles that were pulled from this Connection in the 
past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return Long.valueOf(status.getOutputCount());
-            }
-        })),
-        QUEUED_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", "The 
number of Bytes queued in this Connection", Formatter.DATA_SIZE, new 
ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return status.getQueuedBytes();
-            }
-        })),
-        QUEUED_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", "The 
number of FlowFiles queued in this Connection", Formatter.COUNT, new 
ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return Long.valueOf(status.getQueuedCount());
-            }
-        }));
+        INPUT_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)",
+                "The cumulative size of all FlowFiles that were transferred to 
this Connection in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return status.getInputBytes();
+                    }
+                })),
+        INPUT_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 
mins)",
+                "The number of FlowFiles that were transferred to this 
Connection in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return Long.valueOf(status.getInputCount());
+                    }
+                })),
+        OUTPUT_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)",
+                "The cumulative size of all FlowFiles that were pulled from 
this Connection in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return status.getOutputBytes();
+                    }
+                })),
+        OUTPUT_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 
mins)",
+                "The number of FlowFiles that were pulled from this Connection 
in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return Long.valueOf(status.getOutputCount());
+                    }
+                })),
+        QUEUED_BYTES(new 
StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes",
+                "The number of Bytes queued in this Connection", 
Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return status.getQueuedBytes();
+                    }
+                })),
+        QUEUED_COUNT(new 
StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count",
+                "The number of FlowFiles queued in this Connection", 
Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return Long.valueOf(status.getQueuedCount());
+                    }
+                }));
 
         private MetricDescriptor<ConnectionStatus> descriptor;
 
@@ -490,66 +523,76 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
 
     public static enum ProcessorStatusDescriptor {
 
-        BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", 
"Bytes Read (5 mins)", "The total number of bytes read from the Content 
Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getBytesRead();
-            }
-        })),
-        BYTES_WRITTEN(new 
StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 
mins)", "The total number of bytes written to the Content Repository by this 
Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getBytesWritten();
-            }
-        })),
-        BYTES_TRANSFERRED(new 
StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes 
Transferred (5 mins)", "The total number of bytes read from or written to the 
Content Repository by this Processor in the past 5 minutes", 
Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getBytesRead() + status.getBytesWritten();
-            }
-        })),
-        INPUT_BYTES(new 
StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", 
"The cumulative size of all FlowFiles that this Processor has pulled from its 
queues in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getInputBytes();
-            }
-        })),
-        INPUT_COUNT(new 
StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 
mins)", "The number of FlowFiles that this Processor has pulled from its queues 
in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getInputCount());
-            }
-        })),
-        OUTPUT_BYTES(new 
StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", 
"The cumulative size of all FlowFiles that this Processor has transferred to 
downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getOutputBytes();
-            }
-        })),
-        OUTPUT_COUNT(new 
StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 
mins)", "The number of FlowFiles that this Processor has transferred to 
downstream queues in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getOutputCount());
-            }
-        })),
-        TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", 
"Tasks (5 mins)", "The number of tasks that this Processor has completed in the 
past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getInvocations());
-            }
-        })),
-        TASK_MILLIS(new 
StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 
mins)", "The total number of thread-milliseconds that the Processor has used to 
complete its tasks in the past 5 minutes", Formatter.DURATION, new 
ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return 
TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), 
TimeUnit.NANOSECONDS);
-            }
-        })),
-        FLOWFILES_REMOVED(new 
StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles 
Removed (5 mins)", "The total number of FlowFiles removed by this Processor in 
the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getFlowFilesRemoved());
-            }
-        })),
+        BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", 
"Bytes Read (5 mins)",
+                "The total number of bytes read from the Content Repository by 
this Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getBytesRead();
+                    }
+                })),
+        BYTES_WRITTEN(new 
StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 
mins)",
+                "The total number of bytes written to the Content Repository 
by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getBytesWritten();
+                    }
+                })),
+        BYTES_TRANSFERRED(new 
StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes 
Transferred (5 mins)",
+                "The total number of bytes read from or written to the Content 
Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getBytesRead() + 
status.getBytesWritten();
+                    }
+                })),
+        INPUT_BYTES(new 
StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)",
+                "The cumulative size of all FlowFiles that this Processor has 
pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new 
ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getInputBytes();
+                    }
+                })),
+        INPUT_COUNT(new 
StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)",
+                "The number of FlowFiles that this Processor has pulled from 
its queues in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getInputCount());
+                    }
+                })),
+        OUTPUT_BYTES(new 
StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)",
+                "The cumulative size of all FlowFiles that this Processor has 
transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, 
new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getOutputBytes();
+                    }
+                })),
+        OUTPUT_COUNT(new 
StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 
mins)",
+                "The number of FlowFiles that this Processor has transferred 
to downstream queues in the past 5 minutes", Formatter.COUNT, new 
ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getOutputCount());
+                    }
+                })),
+        TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", 
"Tasks (5 mins)", "The number of tasks that this Processor has completed in the 
past 5 minutes",
+                Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getInvocations());
+                    }
+                })),
+        TASK_MILLIS(new 
StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 
mins)",
+                "The total number of thread-milliseconds that the Processor 
has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new 
ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return 
TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), 
TimeUnit.NANOSECONDS);
+                    }
+                })),
+        FLOWFILES_REMOVED(new 
StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles 
Removed (5 mins)",
+                "The total number of FlowFiles removed by this Processor in 
the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getFlowFilesRemoved());
+                    }
+                })),
         AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
                 "averageLineageDuration",
                 "Average Lineage Duration (5 mins)",

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index 5ecd22e..f3cbb90 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@ -35,8 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Continually runs a Connectable as long as the processor has work to do. 
{@link #call()} will return
- * <code>true</code> if the Connectable should be yielded, <code>false</code> 
otherwise.
+ * Continually runs a Connectable as long as the processor has work to do.
+ * {@link #call()} will return <code>true</code> if the Connectable should be
+ * yielded, <code>false</code> otherwise.
  */
 public class ContinuallyRunConnectableTask implements Callable<Boolean> {
 
@@ -60,7 +61,7 @@ public class ContinuallyRunConnectableTask implements 
Callable<Boolean> {
         if (!scheduleState.isScheduled()) {
             return false;
         }
-        
+
         // Connectable should run if the following conditions are met:
         // 1. It is not yielded.
         // 2. It has incoming connections with FlowFiles queued or doesn't 
expect incoming connections
@@ -106,7 +107,7 @@ public class ContinuallyRunConnectableTask implements 
Callable<Boolean> {
             // yield for just a bit.
             return true;
         }
-        
-        return false;  // do not yield
+
+        return false; // do not yield
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index cff8744..baed6ae 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -43,10 +43,10 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * Continually runs a processor as long as the processor has work to do. 
{@link #call()} will return
- * <code>true</code> if the processor should be yielded, <code>false</code> 
otherwise.
+ * Continually runs a processor as long as the processor has work to do.
+ * {@link #call()} will return <code>true</code> if the processor should be
+ * yielded, <code>false</code> otherwise.
  */
 public class ContinuallyRunProcessorTask implements Callable<Boolean> {
 
@@ -61,7 +61,7 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
     private final int numRelationships;
 
     public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, 
final ProcessorNode procNode,
-            final FlowController flowController, final ProcessContextFactory 
contextFactory, final ScheduleState scheduleState, 
+            final FlowController flowController, final ProcessContextFactory 
contextFactory, final ScheduleState scheduleState,
             final StandardProcessContext processContext) {
 
         this.schedulingAgent = schedulingAgent;
@@ -163,9 +163,9 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
                 if (batch) {
                     rawSession.commit();
                 }
-    
+
                 final long processingNanos = System.nanoTime() - startNanos;
-    
+
                 // if the processor is no longer scheduled to run and this is 
the last thread,
                 // invoke the OnStopped methods
                 if (!scheduleState.isScheduled() && 
scheduleState.getActiveThreadCount() == 1 && 
scheduleState.mustCallOnStoppedMethods()) {
@@ -174,7 +174,7 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
                         flowController.heartbeat();
                     }
                 }
-    
+
                 try {
                     final StandardFlowFileEvent procEvent = new 
StandardFlowFileEvent(procNode.getIdentifier());
                     procEvent.setProcessingNanos(processingNanos);
@@ -188,7 +188,7 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
                 scheduleState.decrementActiveThreadCount();
             }
         }
-        
+
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
index 0c472c8..5724bb4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
@@ -42,7 +42,7 @@ public class ReportingTaskWrapper implements Runnable {
             
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
         } catch (final Throwable t) {
             final ComponentLog componentLog = new 
SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask());
-            componentLog.error("Error running task {} due to {}", new Object[] 
{taskNode.getReportingTask(), t.toString()});
+            componentLog.error("Error running task {} due to {}", new 
Object[]{taskNode.getReportingTask(), t.toString()});
             if (componentLog.isDebugEnabled()) {
                 componentLog.error("", t);
             }
@@ -52,7 +52,9 @@ public class ReportingTaskWrapper implements Runnable {
                 // invoke the OnStopped methods
                 if (!scheduleState.isScheduled() && 
scheduleState.getActiveThreadCount() == 1 && 
scheduleState.mustCallOnStoppedMethods()) {
                     try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                        
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
org.apache.nifi.processor.annotation.OnStopped.class, 
taskNode.getReportingTask(), taskNode.getConfigurationContext());
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(
+                                OnStopped.class, 
org.apache.nifi.processor.annotation.OnStopped.class,
+                                taskNode.getReportingTask(), 
taskNode.getConfigurationContext());
                     }
                 }
             } finally {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
index be79c5b..fccd10e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
@@ -76,7 +76,7 @@ public final class StringEncryptor {
      * Creates an instance of the nifi sensitive property encryptor. Validates
      * that the encryptor is actually working.
      *
-     * @return
+     * @return encryptor
      * @throws EncryptionException if any issues arise initializing or
      * validating the encryptor
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
index 76e8e3e..3be178f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
@@ -27,9 +27,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * @author unattributed
- */
 public final class FlowEngine extends ScheduledThreadPoolExecutor {
 
     private static final Logger logger = 
LoggerFactory.getLogger(FlowEngine.class);
@@ -39,19 +36,20 @@ public final class FlowEngine extends 
ScheduledThreadPoolExecutor {
      *
      * @param corePoolSize the maximum number of threads available to tasks
      * running in the engine.
-     * @param threadNamePrefix
+     * @param threadNamePrefix for naming the thread
      */
     public FlowEngine(int corePoolSize, final String threadNamePrefix) {
-       this(corePoolSize, threadNamePrefix, false);
+        this(corePoolSize, threadNamePrefix, false);
     }
-       
+
     /**
      * Creates a new instance of FlowEngine
      *
      * @param corePoolSize the maximum number of threads available to tasks
      * running in the engine.
-     * @param threadNamePrefix
-     * @param deamon if true, the thread pool will be populated with daemon 
threads, otherwise the threads will not be marked as daemon.
+     * @param threadNamePrefix for thread naming
+     * @param daemon if true, the thread pool will be populated with daemon
+     * threads, otherwise the threads will not be marked as daemon.
      */
     public FlowEngine(int corePoolSize, final String threadNamePrefix, final 
boolean daemon) {
         super(corePoolSize);
@@ -62,8 +60,8 @@ public final class FlowEngine extends 
ScheduledThreadPoolExecutor {
             @Override
             public Thread newThread(final Runnable r) {
                 final Thread t = defaultThreadFactory.newThread(r);
-                if ( daemon ) {
-                       t.setDaemon(true);
+                if (daemon) {
+                    t.setDaemon(true);
                 }
                 t.setName(threadNamePrefix + " Thread-" + 
threadIndex.incrementAndGet());
                 return t;
@@ -75,8 +73,8 @@ public final class FlowEngine extends 
ScheduledThreadPoolExecutor {
      * Hook method called by the running thread whenever a runnable task is
      * given to the thread to run.
      *
-     * @param thread
-     * @param runnable
+     * @param thread thread
+     * @param runnable runnable
      */
     @Override
     protected void beforeExecute(final Thread thread, final Runnable runnable) 
{
@@ -90,8 +88,8 @@ public final class FlowEngine extends 
ScheduledThreadPoolExecutor {
      * execution of the runnable completed. Logs the fact of completion and any
      * errors that might have occured.
      *
-     * @param runnable
-     * @param throwable
+     * @param runnable runnable
+     * @param throwable throwable
      */
     @Override
     protected void afterExecute(final Runnable runnable, final Throwable 
throwable) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 044541b..e8708bd 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -180,7 +180,7 @@ public class VolatileBulletinRepository implements 
BulletinRepository {
      * bulletin strategy is employed, bulletins will not be persisted in this
      * repository and will sent to the specified strategy instead.
      *
-     * @param strategy
+     * @param strategy bulletin strategy
      */
     public void overrideDefaultBulletinProcessing(final 
BulletinProcessingStrategy strategy) {
         Objects.requireNonNull(strategy);

Reply via email to