http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index d8f1338..a45bf76 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -61,9 +61,6 @@ import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ public class StandardControllerServiceProvider implements ControllerServiceProvider { private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class); @@ -112,24 +109,24 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi populateInterfaces(superClass, interfacesDefinedThusFar); } } - + @Override public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { if (type == null || id == null) { throw new NullPointerException(); } - + final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); try { final ClassLoader cl = ExtensionManager.getClassLoader(type); final Class<?> rawClass; - if ( cl == null ) { + if (cl == null) { rawClass = Class.forName(type); } else { Thread.currentThread().setContextClassLoader(cl); rawClass = Class.forName(type, false, cl); } - + final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class); final ControllerService originalService = controllerServiceClass.newInstance(); @@ -138,11 +135,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { - final String methodName = method.getName(); - if("initialize".equals(methodName) || "onPropertyModified".equals(methodName)){ - throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework"); - } - + final String methodName = method.getName(); + if ("initialize".equals(methodName) || "onPropertyModified".equals(methodName)) { + throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework"); + } + final ControllerServiceNode node = serviceNodeHolder.get(); final ControllerServiceState state = node.getState(); final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED. @@ -166,7 +163,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi }; final ControllerService proxiedService; - if ( cl == null ) { + if (cl == null) { proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler); } else { proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler); @@ -181,8 +178,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this); serviceNodeHolder.set(serviceNode); serviceNode.setName(rawClass.getSimpleName()); - - if ( firstTimeAdded ) { + + if (firstTimeAdded) { try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService); } catch (final Exception e) { @@ -200,226 +197,227 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } } - - - + @Override public void disableReferencingServices(final ControllerServiceNode serviceNode) { // Get a list of all Controller Services that need to be disabled, in the order that they need to be // disabled. final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable); - - for ( final ControllerServiceNode nodeToDisable : toDisable ) { + + for (final ControllerServiceNode nodeToDisable : toDisable) { final ControllerServiceState state = nodeToDisable.getState(); - - if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) { + + if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) { nodeToDisable.verifyCanDisable(serviceSet); } } - + Collections.reverse(toDisable); - for ( final ControllerServiceNode nodeToDisable : toDisable ) { + for (final ControllerServiceNode nodeToDisable : toDisable) { final ControllerServiceState state = nodeToDisable.getState(); - - if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) { + + if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) { disableControllerService(nodeToDisable); } } } - - + @Override public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); - + // verify that we can start all components (that are not disabled) before doing anything - for ( final ProcessorNode node : processors ) { - if ( node.getScheduledState() != ScheduledState.DISABLED ) { + for (final ProcessorNode node : processors) { + if (node.getScheduledState() != ScheduledState.DISABLED) { node.verifyCanStart(); } } - for ( final ReportingTaskNode node : reportingTasks ) { - if ( node.getScheduledState() != ScheduledState.DISABLED ) { + for (final ReportingTaskNode node : reportingTasks) { + if (node.getScheduledState() != ScheduledState.DISABLED) { node.verifyCanStart(); } } - + // start all of the components that are not disabled - for ( final ProcessorNode node : processors ) { - if ( node.getScheduledState() != ScheduledState.DISABLED ) { + for (final ProcessorNode node : processors) { + if (node.getScheduledState() != ScheduledState.DISABLED) { node.getProcessGroup().startProcessor(node); } } - for ( final ReportingTaskNode node : reportingTasks ) { - if ( node.getScheduledState() != ScheduledState.DISABLED ) { + for (final ReportingTaskNode node : reportingTasks) { + if (node.getScheduledState() != ScheduledState.DISABLED) { processScheduler.schedule(node); } } } - + @Override public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, // or a service that references this controller service, etc. final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); - + // verify that we can stop all components (that are running) before doing anything - for ( final ProcessorNode node : processors ) { - if ( node.getScheduledState() == ScheduledState.RUNNING ) { + for (final ProcessorNode node : processors) { + if (node.getScheduledState() == ScheduledState.RUNNING) { node.verifyCanStop(); } } - for ( final ReportingTaskNode node : reportingTasks ) { - if ( node.getScheduledState() == ScheduledState.RUNNING ) { + for (final ReportingTaskNode node : reportingTasks) { + if (node.getScheduledState() == ScheduledState.RUNNING) { node.verifyCanStop(); } } - + // stop all of the components that are running - for ( final ProcessorNode node : processors ) { - if ( node.getScheduledState() == ScheduledState.RUNNING ) { + for (final ProcessorNode node : processors) { + if (node.getScheduledState() == ScheduledState.RUNNING) { node.getProcessGroup().stopProcessor(node); } } - for ( final ReportingTaskNode node : reportingTasks ) { - if ( node.getScheduledState() == ScheduledState.RUNNING ) { + for (final ReportingTaskNode node : reportingTasks) { + if (node.getScheduledState() == ScheduledState.RUNNING) { processScheduler.unschedule(node); } } } - + @Override public void enableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanEnable(); processScheduler.enableControllerService(serviceNode); } - + @Override public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { final Set<ControllerServiceNode> servicesToEnable = new HashSet<>(); // Ensure that all nodes are already disabled - for ( final ControllerServiceNode serviceNode : serviceNodes ) { + for (final ControllerServiceNode serviceNode : serviceNodes) { final ControllerServiceState curState = serviceNode.getState(); - if ( ControllerServiceState.DISABLED.equals(curState) ) { + if (ControllerServiceState.DISABLED.equals(curState)) { servicesToEnable.add(serviceNode); } else { logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState); } } - + // determine the order to load the services. We have to ensure that if service A references service B, then B // is enabled first, and so on. final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>(); - for ( final ControllerServiceNode node : servicesToEnable ) { + for (final ControllerServiceNode node : servicesToEnable) { idToNodeMap.put(node.getIdentifier(), node); } - + // We can have many Controller Services dependent on one another. We can have many of these // disparate lists of Controller Services that are dependent on one another. We refer to each // of these as a branch. final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap); - if ( branches.isEmpty() ) { + if (branches.isEmpty()) { logger.info("No Controller Services to enable"); return; } else { logger.info("Will enable {} Controller Services", servicesToEnable.size()); } - + // Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks // to be valid so that they can be scheduled. - for ( final List<ControllerServiceNode> branch : branches ) { - for ( final ControllerServiceNode nodeToEnable : branch ) { + for (final List<ControllerServiceNode> branch : branches) { + for (final ControllerServiceNode nodeToEnable : branch) { nodeToEnable.setState(ControllerServiceState.ENABLING); } } - + final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>()); final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size())); - for ( final List<ControllerServiceNode> branch : branches ) { + for (final List<ControllerServiceNode> branch : branches) { final Runnable enableBranchRunnable = new Runnable() { @Override public void run() { logger.debug("Enabling Controller Service Branch {}", branch); - - for ( final ControllerServiceNode serviceNode : branch ) { + + for (final ControllerServiceNode serviceNode : branch) { try { - if ( !enabledNodes.contains(serviceNode) ) { + if (!enabledNodes.contains(serviceNode)) { enabledNodes.add(serviceNode); - + logger.info("Enabling {}", serviceNode); try { processScheduler.enableControllerService(serviceNode); } catch (final Exception e) { logger.error("Failed to enable " + serviceNode + " due to " + e); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.error("", e); } - - if ( bulletinRepo != null ) { + + if (bulletinRepo != null) { bulletinRepo.addBulletin(BulletinFactory.createBulletin( - "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); + "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); } } } - + // wait for service to finish enabling. - while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) { + while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) { try { Thread.sleep(100L); - } catch (final InterruptedException ie) {} + } catch (final InterruptedException ie) { + } } - + logger.info("State for {} is now {}", serviceNode, serviceNode.getState()); } catch (final Exception e) { logger.error("Failed to enable {} due to {}", serviceNode, e.toString()); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.error("", e); } } } } }; - + executor.submit(enableBranchRunnable); } - + executor.shutdown(); } - + static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>(); - - for ( final ControllerServiceNode node : serviceNodeMap.values() ) { - if ( orderedNodeLists.contains(node) ) { + + for (final ControllerServiceNode node : serviceNodeMap.values()) { + if (orderedNodeLists.contains(node)) { continue; // this node is already in the list. } - + final List<ControllerServiceNode> branch = new ArrayList<>(); determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>()); orderedNodeLists.add(branch); } - + return orderedNodeLists; } - - - private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) { - if ( visited.contains(contextNode) ) { + + private static void determineEnablingOrder( + final Map<String, ControllerServiceNode> serviceNodeMap, + final ControllerServiceNode contextNode, + final List<ControllerServiceNode> orderedNodes, + final Set<ControllerServiceNode> visited) { + if (visited.contains(contextNode)) { return; } - - for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) { - if ( entry.getKey().getControllerServiceDefinition() != null ) { + + for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet()) { + if (entry.getKey().getControllerServiceDefinition() != null) { final String referencedServiceId = entry.getValue(); - if ( referencedServiceId != null ) { + if (referencedServiceId != null) { final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId); - if ( !orderedNodes.contains(referencedNode) ) { + if (!orderedNodes.contains(referencedNode)) { visited.add(contextNode); determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited); } @@ -427,12 +425,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } - if ( !orderedNodes.contains(contextNode) ) { + if (!orderedNodes.contains(contextNode)) { orderedNodes.add(contextNode); } } - - + @Override public void disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); @@ -461,7 +458,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ControllerServiceNode node = controllerServices.get(serviceIdentifier); return (node == null) ? false : (ControllerServiceState.ENABLING == node.getState()); } - + @Override public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) { return controllerServices.get(serviceIdentifier); @@ -478,157 +475,158 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return identifiers; } - + @Override public String getControllerServiceName(final String serviceIdentifier) { - final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier); - return node == null ? null : node.getName(); + final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier); + return node == null ? null : node.getName(); } - + + @Override public void removeControllerService(final ControllerServiceNode serviceNode) { final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier()); - if ( existing == null || existing != serviceNode ) { + if (existing == null || existing != serviceNode) { throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow"); } - + serviceNode.verifyCanDelete(); - + try (final NarCloseable x = NarCloseable.withNarLoader()) { final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext); } - - for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) { + + for (final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.getControllerServiceDefinition() != null ) { + if (descriptor.getControllerServiceDefinition() != null) { final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); - if ( value != null ) { + if (value != null) { final ControllerServiceNode referencedNode = getControllerServiceNode(value); - if ( referencedNode != null ) { + if (referencedNode != null) { referencedNode.removeReference(serviceNode); } } } } - + controllerServices.remove(serviceNode.getIdentifier()); } - + @Override public Set<ControllerServiceNode> getAllControllerServices() { - return new HashSet<>(controllerServices.values()); + return new HashSet<>(controllerServices.values()); } - - + /** - * Returns a List of all components that reference the given referencedNode (either directly or indirectly through - * another service) that are also of the given componentType. The list that is returned is in the order in which they will - * need to be 'activated' (enabled/started). - * @param referencedNode - * @param componentType - * @return + * Returns a List of all components that reference the given referencedNode + * (either directly or indirectly through another service) that are also of + * the given componentType. The list that is returned is in the order in + * which they will need to be 'activated' (enabled/started). + * + * @param referencedNode node + * @param componentType type + * @return list of components */ private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) { final List<T> references = new ArrayList<>(); - - for ( final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents() ) { - if ( componentType.isAssignableFrom(referencingComponent.getClass()) ) { + + for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) { + if (componentType.isAssignableFrom(referencingComponent.getClass())) { references.add(componentType.cast(referencingComponent)); } - - if ( referencingComponent instanceof ControllerServiceNode ) { + + if (referencingComponent instanceof ControllerServiceNode) { final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent; - + // find components recursively that depend on referencingNode. final List<T> recursive = findRecursiveReferences(referencingNode, componentType); - + // For anything that depends on referencing node, we want to add it to the list, but we know // that it must come after the referencing node, so we first remove any existing occurrence. references.removeAll(recursive); references.addAll(recursive); } } - + return references; } - @Override public void enableReferencingServices(final ControllerServiceNode serviceNode) { final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class); enableReferencingServices(serviceNode, recursiveReferences); } - + private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) { - if ( serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING ) { + if (serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING) { serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences)); } - + final Set<ControllerServiceNode> ifEnabled = new HashSet<>(); final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); - for ( final ControllerServiceNode nodeToEnable : toEnable ) { + for (final ControllerServiceNode nodeToEnable : toEnable) { final ControllerServiceState state = nodeToEnable.getState(); - if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) { + if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) { nodeToEnable.verifyCanEnable(ifEnabled); ifEnabled.add(nodeToEnable); } } - - for ( final ControllerServiceNode nodeToEnable : toEnable ) { + + for (final ControllerServiceNode nodeToEnable : toEnable) { final ControllerServiceState state = nodeToEnable.getState(); - if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) { + if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) { enableControllerService(nodeToEnable); } } } - + @Override public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class); final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices); - - for ( final ControllerServiceNode referencingService : referencingServices ) { + + for (final ControllerServiceNode referencingService : referencingServices) { referencingService.verifyCanEnable(referencingServiceSet); } } - + @Override public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class); final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class); - + final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices); - - for ( final ReportingTaskNode taskNode : referencingReportingTasks ) { - if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) { + + for (final ReportingTaskNode taskNode : referencingReportingTasks) { + if (taskNode.getScheduledState() != ScheduledState.DISABLED) { taskNode.verifyCanStart(referencingServiceSet); } } - - for ( final ProcessorNode procNode : referencingProcessors ) { - if ( procNode.getScheduledState() != ScheduledState.DISABLED ) { + + for (final ProcessorNode procNode : referencingProcessors) { + if (procNode.getScheduledState() != ScheduledState.DISABLED) { procNode.verifyCanStart(referencingServiceSet); } } } - + @Override public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) { // Get a list of all Controller Services that need to be disabled, in the order that they need to be // disabled. final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable); - - for ( final ControllerServiceNode nodeToDisable : toDisable ) { + + for (final ControllerServiceNode nodeToDisable : toDisable) { final ControllerServiceState state = nodeToDisable.getState(); - - if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) { + + if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) { nodeToDisable.verifyCanDisable(serviceSet); } } } - + @Override public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) { // we can always stop referencing components
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java index c470b99..701adcf 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java @@ -65,9 +65,9 @@ public class StandardControllerServiceReference implements ControllerServiceRefe for (final ConfiguredComponent component : components) { if (component instanceof ControllerServiceNode) { serviceNodes.add((ControllerServiceNode) component); - + final ControllerServiceState state = ((ControllerServiceNode) component).getState(); - if ( state != ControllerServiceState.DISABLED ) { + if (state != ControllerServiceState.DISABLED) { activeReferences.add(component); } } else if (isRunning(component)) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java index 89ac846..6970fce 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java @@ -31,7 +31,8 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> { this(field, label, description, formatter, valueFunction, null); } - public StandardMetricDescriptor(final String field, final String label, final String description, final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) { + public StandardMetricDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) { this.field = field; this.label = label; this.description = description; @@ -40,41 +41,21 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> { this.reducer = reducer == null ? new SumReducer() : reducer; } - /** - * The name of this status field. - * - * @return - */ @Override public String getField() { return field; } - /** - * The label of this status field. - * - * @return - */ @Override public String getLabel() { return label; } - /** - * The description of this status field. - * - * @return - */ @Override public String getDescription() { return description; } - /** - * The formatter for this descriptor. - * - * @return - */ @Override public MetricDescriptor.Formatter getFormatter() { return formatter; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java index 0872192..d2a983a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java @@ -91,7 +91,8 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit @Override public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final Date timestamp) { - captures.add(new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR, ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP))); + captures.add(new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR, + ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP))); logger.debug("Captured metrics for {}", this); lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime()); } @@ -269,48 +270,57 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit public static enum RemoteProcessGroupStatusDescriptor { - SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getSentContentSize(); - } - })), - SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentCount().longValue()); - } - })), - RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getReceivedContentSize(); - } - })), - RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedCount().longValue()); - } - })), - RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedContentSize().longValue() / 300L); - } - })), - SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentContentSize().longValue() / 300L); - } - })), - TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); - } - })), + SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", + "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getSentContentSize(); + } + })), + SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", + "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getSentCount().longValue()); + } + })), + RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", + "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getReceivedContentSize(); + } + })), + RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", + "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getReceivedCount().longValue()); + } + })), + RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", + "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", + Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getReceivedContentSize().longValue() / 300L); + } + })), + SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", + "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getSentContentSize().longValue() / 300L); + } + })), + TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", + "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", + Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); + } + })), AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>( "averageLineageDuration", "Average Lineage Duration (5 mins)", @@ -358,66 +368,83 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit public static enum ProcessGroupStatusDescriptor { - BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead(); - } - })), - BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesWritten(); - } - })), - BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), - INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputContentSize(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputCount().longValue(); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputContentSize(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputCount().longValue(); - } - })), - QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", "The cumulative size of all FlowFiles queued in all Connections of this Process Group", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedContentSize(); - } - })), - QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedCount().longValue(); - } - })), - TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return calculateTaskMillis(status); - } - })); + BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", + "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesRead(); + } + })), + BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", + "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesWritten(); + } + })), + BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", + "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesRead() + status.getBytesWritten(); + } + })), + INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getInputContentSize(); + } + })), + INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", + "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getInputCount().longValue(); + } + })), + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getOutputContentSize(); + } + })), + OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", + "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getOutputCount().longValue(); + } + })), + QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", + "The cumulative size of all FlowFiles queued in all Connections of this Process Group", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getQueuedContentSize(); + } + })), + QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", + "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getQueuedCount().longValue(); + } + })), + TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", + "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", + Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return calculateTaskMillis(status); + } + })); private MetricDescriptor<ProcessGroupStatus> descriptor; @@ -436,42 +463,48 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit public static enum ConnectionStatusDescriptor { - INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getInputBytes(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getOutputBytes(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), - QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getQueuedBytes(); - } - })), - QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getQueuedCount()); - } - })); + INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getInputBytes(); + } + })), + INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)", + "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getInputCount()); + } + })), + OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getOutputBytes(); + } + })), + OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)", + "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getOutputCount()); + } + })), + QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", + "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getQueuedBytes(); + } + })), + QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", + "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getQueuedCount()); + } + })); private MetricDescriptor<ConnectionStatus> descriptor; @@ -490,66 +523,76 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit public static enum ProcessorStatusDescriptor { - BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead(); - } - })), - BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesWritten(); - } - })), - BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), - INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getInputBytes(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getOutputBytes(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), - TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInvocations()); - } - })), - TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS); - } - })), - FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)", "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getFlowFilesRemoved()); - } - })), + BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)", + "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesRead(); + } + })), + BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)", + "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesWritten(); + } + })), + BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)", + "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesRead() + status.getBytesWritten(); + } + })), + INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getInputBytes(); + } + })), + INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)", + "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getInputCount()); + } + })), + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getOutputBytes(); + } + })), + OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)", + "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getOutputCount()); + } + })), + TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getInvocations()); + } + })), + TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)", + "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS); + } + })), + FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)", + "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getFlowFilesRemoved()); + } + })), AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>( "averageLineageDuration", "Average Lineage Duration (5 mins)", http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index 5ecd22e..f3cbb90 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@ -35,8 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Continually runs a Connectable as long as the processor has work to do. {@link #call()} will return - * <code>true</code> if the Connectable should be yielded, <code>false</code> otherwise. + * Continually runs a Connectable as long as the processor has work to do. + * {@link #call()} will return <code>true</code> if the Connectable should be + * yielded, <code>false</code> otherwise. */ public class ContinuallyRunConnectableTask implements Callable<Boolean> { @@ -60,7 +61,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { if (!scheduleState.isScheduled()) { return false; } - + // Connectable should run if the following conditions are met: // 1. It is not yielded. // 2. It has incoming connections with FlowFiles queued or doesn't expect incoming connections @@ -106,7 +107,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { // yield for just a bit. return true; } - - return false; // do not yield + + return false; // do not yield } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index cff8744..baed6ae 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -43,10 +43,10 @@ import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * Continually runs a processor as long as the processor has work to do. {@link #call()} will return - * <code>true</code> if the processor should be yielded, <code>false</code> otherwise. + * Continually runs a processor as long as the processor has work to do. + * {@link #call()} will return <code>true</code> if the processor should be + * yielded, <code>false</code> otherwise. */ public class ContinuallyRunProcessorTask implements Callable<Boolean> { @@ -61,7 +61,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { private final int numRelationships; public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode, - final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, + final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StandardProcessContext processContext) { this.schedulingAgent = schedulingAgent; @@ -163,9 +163,9 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { if (batch) { rawSession.commit(); } - + final long processingNanos = System.nanoTime() - startNanos; - + // if the processor is no longer scheduled to run and this is the last thread, // invoke the OnStopped methods if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { @@ -174,7 +174,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { flowController.heartbeat(); } } - + try { final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier()); procEvent.setProcessingNanos(processingNanos); @@ -188,7 +188,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { scheduleState.decrementActiveThreadCount(); } } - + return false; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java index 0c472c8..5724bb4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java @@ -42,7 +42,7 @@ public class ReportingTaskWrapper implements Runnable { taskNode.getReportingTask().onTrigger(taskNode.getReportingContext()); } catch (final Throwable t) { final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask()); - componentLog.error("Error running task {} due to {}", new Object[] {taskNode.getReportingTask(), t.toString()}); + componentLog.error("Error running task {} due to {}", new Object[]{taskNode.getReportingTask(), t.toString()}); if (componentLog.isDebugEnabled()) { componentLog.error("", t); } @@ -52,7 +52,9 @@ public class ReportingTaskWrapper implements Runnable { // invoke the OnStopped methods if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation( + OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, + taskNode.getReportingTask(), taskNode.getConfigurationContext()); } } } finally { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java index be79c5b..fccd10e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java @@ -76,7 +76,7 @@ public final class StringEncryptor { * Creates an instance of the nifi sensitive property encryptor. Validates * that the encryptor is actually working. * - * @return + * @return encryptor * @throws EncryptionException if any issues arise initializing or * validating the encryptor */ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java index 76e8e3e..3be178f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java @@ -27,9 +27,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; -/** - * @author unattributed - */ public final class FlowEngine extends ScheduledThreadPoolExecutor { private static final Logger logger = LoggerFactory.getLogger(FlowEngine.class); @@ -39,19 +36,20 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor { * * @param corePoolSize the maximum number of threads available to tasks * running in the engine. - * @param threadNamePrefix + * @param threadNamePrefix for naming the thread */ public FlowEngine(int corePoolSize, final String threadNamePrefix) { - this(corePoolSize, threadNamePrefix, false); + this(corePoolSize, threadNamePrefix, false); } - + /** * Creates a new instance of FlowEngine * * @param corePoolSize the maximum number of threads available to tasks * running in the engine. - * @param threadNamePrefix - * @param deamon if true, the thread pool will be populated with daemon threads, otherwise the threads will not be marked as daemon. + * @param threadNamePrefix for thread naming + * @param daemon if true, the thread pool will be populated with daemon + * threads, otherwise the threads will not be marked as daemon. */ public FlowEngine(int corePoolSize, final String threadNamePrefix, final boolean daemon) { super(corePoolSize); @@ -62,8 +60,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor { @Override public Thread newThread(final Runnable r) { final Thread t = defaultThreadFactory.newThread(r); - if ( daemon ) { - t.setDaemon(true); + if (daemon) { + t.setDaemon(true); } t.setName(threadNamePrefix + " Thread-" + threadIndex.incrementAndGet()); return t; @@ -75,8 +73,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor { * Hook method called by the running thread whenever a runnable task is * given to the thread to run. * - * @param thread - * @param runnable + * @param thread thread + * @param runnable runnable */ @Override protected void beforeExecute(final Thread thread, final Runnable runnable) { @@ -90,8 +88,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor { * execution of the runnable completed. Logs the fact of completion and any * errors that might have occured. * - * @param runnable - * @param throwable + * @param runnable runnable + * @param throwable throwable */ @Override protected void afterExecute(final Runnable runnable, final Throwable throwable) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java index 044541b..e8708bd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java @@ -180,7 +180,7 @@ public class VolatileBulletinRepository implements BulletinRepository { * bulletin strategy is employed, bulletins will not be persisted in this * repository and will sent to the specified strategy instead. * - * @param strategy + * @param strategy bulletin strategy */ public void overrideDefaultBulletinProcessing(final BulletinProcessingStrategy strategy) { Objects.requireNonNull(strategy);