http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ec25ab1..07e754e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -457,11 +457,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); instanceId = UUID.randomUUID().toString(); - if (remoteInputSocketPort == null){ + if (remoteInputSocketPort == null) { LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set"); externalSiteListener = null; } else if (isSiteToSiteSecure && sslContext == null) { - LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed."); + LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore " + + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed."); externalSiteListener = null; } else { // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol @@ -530,7 +531,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } }; } - + public void initializeFlow() throws IOException { writeLock.lock(); try { @@ -584,17 +585,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Causes any processors that were added to the flow with a 'delayStart' * flag of true to now start * </p> + * + * @param startDelayedComponents true if start */ public void onFlowInitialized(final boolean startDelayedComponents) { writeLock.lock(); try { - if ( startDelayedComponents ) { + if (startDelayedComponents) { LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); for (final Connectable connectable : startConnectablesAfterInitialization) { if (connectable.getScheduledState() == ScheduledState.DISABLED) { continue; } - + try { if (connectable instanceof ProcessorNode) { connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); @@ -603,14 +606,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } catch (final Throwable t) { LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()}); - if ( LOG.isDebugEnabled() ) { + if (LOG.isDebugEnabled()) { LOG.error("", t); } } } - + startConnectablesAfterInitialization.clear(); - + int startedTransmitting = 0; for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) { try { @@ -620,7 +623,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t}); } } - + LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting); startRemoteGroupPortsAfterInitialization.clear(); } else { @@ -635,7 +638,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); } } - + startConnectablesAfterInitialization.clear(); startRemoteGroupPortsAfterInitialization.clear(); } @@ -720,9 +723,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Creates a new Label * - * @param id - * @param text - * @return + * @param id identifier + * @param text label text + * @return new label * @throws NullPointerException if either argument is null */ public Label createLabel(final String id, final String text) { @@ -732,8 +735,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Creates a funnel * - * @param id - * @return + * @param id funnel id + * @return new funnel */ public Funnel createFunnel(final String id) { return new StandardFunnel(id.intern(), null, processScheduler); @@ -742,9 +745,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Creates a Port to use as an Input Port for a Process Group * - * @param id - * @param name - * @return + * @param id port identifier + * @param name port name + * @return new port * @throws NullPointerException if the ID or name is not unique * @throws IllegalStateException if an Input Port already exists with the * same name or id. @@ -759,9 +762,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Creates a Port to use as an Output Port for a Process Group * - * @param id - * @param name - * @return + * @param id port id + * @param name port name + * @return new port * @throws NullPointerException if the ID or name is not unique * @throws IllegalStateException if an Input Port already exists with the * same name or id. @@ -776,8 +779,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Creates a ProcessGroup with the given ID * - * @param id - * @return + * @param id group id + * @return new group * @throws NullPointerException if the argument is null */ public ProcessGroup createProcessGroup(final String id) { @@ -786,13 +789,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * <p> - * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the - * methods annotated with {@link OnAdded}. + * Creates a new ProcessorNode with the given type and identifier and + * initializes it invoking the methods annotated with {@link OnAdded}. * </p> * - * @param type - * @param id - * @return + * @param type processor type + * @param id processor id + * @return new processor * @throws NullPointerException if either arg is null * @throws ProcessorInstantiationException if the processor cannot be * instantiated for any reason @@ -800,17 +803,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException { return createProcessor(type, id, true); } - + /** * <p> - * Creates a new ProcessorNode with the given type and identifier and optionally initializes it. + * Creates a new ProcessorNode with the given type and identifier and + * optionally initializes it. * </p> * * @param type the fully qualified Processor class name * @param id the unique ID of the Processor - * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true}, - * will invoke methods annotated with the {@link OnAdded} annotation. - * @return + * @param firstTimeAdded whether or not this is the first time this + * Processor is added to the graph. If {@code true}, will invoke methods + * annotated with the {@link OnAdded} annotation. + * @return new processor node * @throws NullPointerException if either arg is null * @throws ProcessorInstantiationException if the processor cannot be * instantiated for any reason @@ -825,7 +830,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); - if ( firstTimeAdded ) { + if (firstTimeAdded) { try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, org.apache.nifi.processor.annotation.OnAdded.class, processor); } catch (final Exception e) { @@ -886,9 +891,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Gets the BulletinRepository for storing and retrieving Bulletins. - * - * @return + * @return the BulletinRepository for storing and retrieving Bulletins */ public BulletinRepository getBulletinRepository() { return bulletinRepository; @@ -902,9 +905,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Creates a Port to use as an Input Port for the root Process Group, which * is used for Site-to-Site communications * - * @param id - * @param name - * @return + * @param id port id + * @param name port name + * @return new port * @throws NullPointerException if the ID or name is not unique * @throws IllegalStateException if an Input Port already exists with the * same name or id. @@ -913,7 +916,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); - return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, + userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); } /** @@ -921,9 +925,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * is used for Site-to-Site communications and will queue flow files waiting * to be delivered to remote instances * - * @param id - * @param name - * @return + * @param id port id + * @param name port name + * @return new port * @throws NullPointerException if the ID or name is not unique * @throws IllegalStateException if an Input Port already exists with the * same name or id. @@ -932,17 +936,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); - return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, + userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); } /** * Creates a new Remote Process Group with the given ID that points to the * given URI * - * @param id - * @param uri - * @return - * + * @param id group id + * @param uri group uri + * @return new group * @throws NullPointerException if either argument is null * @throws IllegalArgumentException if <code>uri</code> is not a valid URI. */ @@ -954,8 +958,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Verifies that no output port exists with the given id or name. If this * does not hold true, throws an IllegalStateException * - * @param id - * @throws IllegalStateException + * @param id port identifier + * @throws IllegalStateException port already exists */ private void verifyPortIdDoesNotExist(final String id) { Port port = rootGroup.findOutputPort(id); @@ -985,7 +989,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Sets the name for the Root Group, which also changes the name for the * controller. * - * @param name + * @param name of root group */ public void setName(final String name) { readLock.lock(); @@ -997,10 +1001,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Gets the comments of this controller, which is also the comment of the - * Root Group. - * - * @return + * @return the comments of this controller, which is also the comment of the + * Root Group */ public String getComments() { readLock.lock(); @@ -1012,10 +1014,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Sets the comment for the Root Group, which also changes the comment for - * the controller. + * Sets the comments * - * @param comments + * @param comments for the Root Group, which also changes the comment for + * the controller */ public void setComments(final String comments) { readLock.lock(); @@ -1075,23 +1077,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Trigger any processors' methods marked with @OnShutdown to be called rootGroup.shutdown(); - + // invoke any methods annotated with @OnShutdown on Controller Services - for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) { + for (final ControllerServiceNode serviceNode : getAllControllerServices()) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext); } } - + // invoke any methods annotated with @OnShutdown on Reporting Tasks - for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) { + for (final ReportingTaskNode taskNode : getAllReportingTasks()) { final ConfigurationContext configContext = taskNode.getConfigurationContext(); try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext); } } - + try { this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); @@ -1108,7 +1110,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) { LOG.info("Controller has been terminated successfully."); } else { - LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop. Might need to kill the program manually."); + LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that " + + "will take an indeterminate amount of time to stop. Might need to kill the program manually."); } if (externalSiteListener != null) { @@ -1118,24 +1121,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (flowFileSwapManager != null) { flowFileSwapManager.shutdown(); } - - if ( processScheduler != null ) { - processScheduler.shutdown(); + + if (processScheduler != null) { + processScheduler.shutdown(); } - - if ( contentRepository != null ) { + + if (contentRepository != null) { contentRepository.shutdown(); } - - if ( provenanceEventRepository != null ) { - try { - provenanceEventRepository.close(); - } catch (final IOException ioe) { - LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString()); - if ( LOG.isDebugEnabled() ) { - LOG.warn("", ioe); - } - } + + if (provenanceEventRepository != null) { + try { + provenanceEventRepository.close(); + } catch (final IOException ioe) { + LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString()); + if (LOG.isDebugEnabled()) { + LOG.warn("", ioe); + } + } } } finally { writeLock.unlock(); @@ -1145,8 +1148,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Serializes the current state of the controller to the given OutputStream * - * @param serializer - * @param os + * @param serializer serializer + * @param os stream * @throws FlowSerializationException if serialization of the flow fails for * any reason */ @@ -1165,7 +1168,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * For more details, see * {@link FlowSynchronizer#sync(FlowController, DataFlow)}. * - * @param synchronizer + * @param synchronizer synchronizer * @param dataFlow the flow to load the controller with. If the flow is null * or zero length, then the controller must not have a flow or else an * UninheritableFlowException will be thrown. @@ -1294,8 +1297,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * in DTO that is <code>null</code> (with the exception of the required ID) * will be ignored. * - * @param dto - * @return a fully-populated DTO representing the newly updated ProcessGroup + * @param dto group * @throws ProcessorInstantiationException * * @throws IllegalStateException if no process group can be found with the @@ -1331,7 +1333,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * part of the current flow. This is going create a template based on a * snippet of this flow. * - * @param dto + * @param dto template * @return a copy of the given DTO * @throws IOException if an I/O error occurs when persisting the Template * @throws NullPointerException if the DTO is null @@ -1346,7 +1348,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Removes all templates from this controller * - * @throws IOException + * @throws IOException ioe */ public void clearTemplates() throws IOException { templateManager.clear(); @@ -1356,20 +1358,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Imports the specified template into this controller. The contents of this * template may have come from another NiFi instance. * - * @param dto - * @return - * @throws IOException + * @param dto dto + * @return template + * @throws IOException ioe */ public Template importTemplate(final TemplateDTO dto) throws IOException { return templateManager.importTemplate(dto); } /** - * Returns the template with the given ID, or <code>null</code> if no - * template exists with the given ID. - * - * @param id - * @return + * @param id identifier + * @return the template with the given ID, or <code>null</code> if no + * template exists with the given ID */ public Template getTemplate(final String id) { return templateManager.getTemplate(id); @@ -1380,9 +1380,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns all templates that this controller knows about. - * - * @return + * @return all templates that this controller knows about */ public Collection<Template> getTemplates() { return templateManager.getTemplates(); @@ -1411,8 +1409,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Creates an instance of the given snippet and adds the components to the * given group * - * @param group - * @param dto + * @param group group + * @param dto dto * * @throws NullPointerException if either argument is null * @throws IllegalStateException if the snippet is not valid because a @@ -1432,27 +1430,27 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // // Instantiate Controller Services // - for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) { + for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) { final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), true); - + serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData()); serviceNode.setComments(controllerServiceDTO.getComments()); serviceNode.setName(controllerServiceDTO.getName()); } - + // configure controller services. We do this after creating all of them in case 1 service // references another service. - for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) { + for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) { final String serviceId = controllerServiceDTO.getId(); final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId); - - for ( final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet() ) { - if ( entry.getValue() != null ) { + + for (final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet()) { + if (entry.getValue() != null) { serviceNode.setProperty(entry.getKey(), entry.getValue()); } } } - + // // Instantiate the labels // @@ -1467,7 +1465,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R group.addLabel(label); } - // // Instantiate the funnels for (final FunnelDTO funnelDTO : dto.getFunnels()) { final Funnel funnel = createFunnel(funnelDTO.getId()); @@ -1604,7 +1601,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R group.addRemoteProcessGroup(remoteGroup); } - // + // // Instantiate ProcessGroups // for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) { @@ -1640,12 +1637,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final Connectable source; final Connectable destination; - // locate the source and destination connectable. if this is a remote port - // we need to locate the remote process groups. otherwise we need to + // locate the source and destination connectable. if this is a remote port + // we need to locate the remote process groups. otherwise we need to // find the connectable given its parent group. // NOTE: (getConnectable returns ANY connectable, when the parent is - // not this group only input ports or output ports should be returned. if something - // other than a port is returned, an exception will be thrown when adding the + // not this group only input ports or output ports should be returned. if something + // other than a port is returned, an exception will be thrown when adding the // connection below.) // see if the source connectable is a remote port if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) { @@ -1711,8 +1708,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Converts a set of ports into a set of remote process group ports. * - * @param ports - * @return + * @param ports ports + * @return group descriptors */ private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) { Set<RemoteProcessGroupPortDescriptor> remotePorts = null; @@ -1738,8 +1735,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Returns the parent of the specified Connectable. This only considers this * group and any direct child sub groups. * - * @param parentGroupId - * @return + * @param parentGroupId group id + * @return parent group */ private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) { if (areGroupsSame(group.getIdentifier(), parentGroupId)) { @@ -1770,8 +1767,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * {@link ProcessorInstantiationException} will be thrown. * </p> * - * @param group - * @param templateContents + * @param group group + * @param templateContents contents */ private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) { // validate the names of Input Ports @@ -1816,7 +1813,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new IllegalStateException("Invalid Processor Type: " + proc.getType()); } } - + final Set<ControllerServiceDTO> controllerServices = templateContents.getControllerServices(); if (controllerServices != null) { for (final ControllerServiceDTO service : controllerServices) { @@ -1841,8 +1838,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Recursively finds all ProcessorDTO's * - * @param group - * @return + * @param group group + * @return processor dto set */ private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) { final Set<ProcessorDTO> procs = new HashSet<>(); @@ -1859,8 +1856,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Recursively finds all ConnectionDTO's * - * @param group - * @return + * @param group group + * @return connection dtos */ private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO group) { final Set<ConnectionDTO> conns = new HashSet<>(); @@ -1879,11 +1876,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // /** * Indicates whether or not the two ID's point to the same ProcessGroup. If - * either id is null, will return <code>false</code. + * either id is null, will return <code>false</code>. * - * @param id1 - * @param id2 - * @return + * @param id1 group id + * @param id2 other group id + * @return true if same */ public boolean areGroupsSame(final String id1, final String id2) { if (id1 == null || id2 == null) { @@ -1999,7 +1996,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Returns the ProcessGroup with the given ID * - * @param id + * @param id group * @return the process group or null if not group is found */ private ProcessGroup lookupGroup(final String id) { @@ -2013,7 +2010,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Returns the ProcessGroup with the given ID * - * @param id + * @param id group id * @return the process group or null if not group is found */ public ProcessGroup getGroup(final String id) { @@ -2083,7 +2080,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R bytesSent += procStat.getBytesSent(); } - // set status for local child groups + // set status for local child groups final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>(); status.setProcessGroupStatus(localChildGroupStatusCollection); for (final ProcessGroup childGroup : group.getProcessGroups()) { @@ -2441,8 +2438,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // determine the run status and get any validation errors... must check - // is valid when not disabled since a processors validity could change due - // to environmental conditions (property configured with a file path and + // is valid when not disabled since a processors validity could change due + // to environmental conditions (property configured with a file path and // the file being externally removed) if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { status.setRunStatus(RunStatus.Disabled); @@ -2548,17 +2545,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException { return createReportingTask(type, true); } - + public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { - return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded); + return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded); } - + @Override public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException { if (type == null || id == null) { throw new NullPointerException(); } - + ReportingTask task = null; final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { @@ -2585,8 +2582,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory); taskNode.setName(task.getClass().getSimpleName()); - - if ( firstTimeAdded ) { + + if (firstTimeAdded) { final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this); @@ -2596,14 +2593,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } catch (final InitializationException ie) { throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie); } - + try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task); } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e); } } - + reportingTasks.put(id, taskNode); return taskNode; } @@ -2620,10 +2617,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } reportingTaskNode.verifyCanStart(); - processScheduler.schedule(reportingTaskNode); + processScheduler.schedule(reportingTaskNode); } - @Override public void stopReportingTask(final ReportingTaskNode reportingTaskNode) { if (isTerminated()) { @@ -2637,32 +2633,32 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public void removeReportingTask(final ReportingTaskNode reportingTaskNode) { final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier()); - if ( existing == null || existing != reportingTaskNode ) { + if (existing == null || existing != reportingTaskNode) { throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow"); } - + reportingTaskNode.verifyCanDelete(); - + try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext()); } - - for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) { + + for (final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.getControllerServiceDefinition() != null ) { + if (descriptor.getControllerServiceDefinition() != null) { final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue(); - if ( value != null ) { + if (value != null) { final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value); - if ( serviceNode != null ) { + if (serviceNode != null) { serviceNode.removeReference(reportingTaskNode); } } } } - + reportingTasks.remove(reportingTaskNode.getIdentifier()); } - + @Override public Set<ReportingTaskNode> getAllReportingTasks() { return new HashSet<>(reportingTasks.values()); @@ -2672,60 +2668,60 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); } - + @Override public void enableReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanEnable(); processScheduler.enableReportingTask(reportingTaskNode); } - + @Override public void disableReportingTask(final ReportingTaskNode reportingTaskNode) { reportingTaskNode.verifyCanDisable(); processScheduler.disableReportingTask(reportingTaskNode); } - + @Override public void disableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.disableReferencingServices(serviceNode); } - + @Override public void enableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.enableReferencingServices(serviceNode); } - + @Override public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.scheduleReferencingComponents(serviceNode); } - + @Override public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.unscheduleReferencingComponents(serviceNode); } - + @Override public void enableControllerService(final ControllerServiceNode serviceNode) { controllerServiceProvider.enableControllerService(serviceNode); } - + @Override public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { controllerServiceProvider.enableControllerServices(serviceNodes); } - + @Override public void disableControllerService(final ControllerServiceNode serviceNode) { serviceNode.verifyCanDisable(); controllerServiceProvider.disableControllerService(serviceNode); } - + @Override public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode); } - + @Override public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode); @@ -2735,12 +2731,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode); } - + @Override public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) { controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode); } - + @Override public ControllerService getControllerService(final String serviceIdentifier) { return controllerServiceProvider.getControllerService(serviceIdentifier); @@ -2765,21 +2761,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isControllerServiceEnabling(final String serviceIdentifier) { return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier); } - + @Override public String getControllerServiceName(final String serviceIdentifier) { - return controllerServiceProvider.getControllerServiceName(serviceIdentifier); + return controllerServiceProvider.getControllerServiceName(serviceIdentifier); } + @Override public void removeControllerService(final ControllerServiceNode serviceNode) { controllerServiceProvider.removeControllerService(serviceNode); } - + @Override public Set<ControllerServiceNode> getAllControllerServices() { - return controllerServiceProvider.getAllControllerServices(); + return controllerServiceProvider.getAllControllerServices(); } - + // // Counters // @@ -2842,7 +2839,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Starts heartbeating to the cluster. May only be called if the instance * was constructed for a clustered environment. * - * @throws IllegalStateException + * @throws IllegalStateException if not configured for clustering */ public void startHeartbeating() throws IllegalStateException { if (!configuredForClustering) { @@ -2893,7 +2890,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * constructed for a clustered environment. If the controller was not * heartbeating, then this method has no effect. * - * @throws IllegalStateException + * @throws IllegalStateException if not clustered */ public void stopHeartbeating() throws IllegalStateException { @@ -2925,9 +2922,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns true if the instance is heartbeating; false otherwise. - * - * @return + * @return true if the instance is heartbeating; false otherwise */ public boolean isHeartbeating() { readLock.lock(); @@ -2940,9 +2935,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the number of seconds to wait between successive heartbeats. - * - * @return + * @return the number of seconds to wait between successive heartbeats */ public int getHeartbeatDelaySeconds() { readLock.lock(); @@ -2996,12 +2989,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } /** - * Returns the DN of the Cluster Manager that we are currently connected to, + * @return the DN of the Cluster Manager that we are currently connected to, * if available. This will return null if the instance is not clustered or * if the instance is clustered but the NCM's DN is not available - for - * instance, if cluster communications are not secure. - * - * @return + * instance, if cluster communications are not secure */ public String getClusterManagerDN() { readLock.lock(); @@ -3016,7 +3007,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Sets whether this instance is clustered. Clustered means that a node is * either connected or trying to connect to the cluster. * - * @param clustered + * @param clustered true if clustered * @param clusterInstanceId if clustered is true, indicates the InstanceID * of the Cluster Manager */ @@ -3028,7 +3019,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * Sets whether this instance is clustered. Clustered means that a node is * either connected or trying to connect to the cluster. * - * @param clustered + * @param clustered true if clustered * @param clusterInstanceId if clustered is true, indicates the InstanceID * of the Cluster Manager * @param clusterManagerDn the DN of the NCM @@ -3288,7 +3279,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return "Failed to determine whether or not content was available in Content Repository due to " + ioe.toString(); } - // Make sure that the source queue exists + // Make sure that the source queue exists if (event.getSourceQueueIdentifier() == null) { return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"; } @@ -3339,7 +3330,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not contain the required Content Claim"); } - // Make sure that the source queue exists + // Make sure that the source queue exists if (event.getSourceQueueIdentifier() == null) { throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"); } @@ -3358,7 +3349,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // Create the ContentClaim - final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); + final ContentClaim claim = contentClaimManager.newContentClaim(event.getPreviousContentClaimContainer(), + event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); // Increment Claimant Count, since we will now be referencing the Content Claim contentClaimManager.incrementClaimantCount(claim); @@ -3544,7 +3536,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (bulletin.getGroupId() == null) { escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); } else { - escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); + escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), + bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); } } else { escapedBulletin = bulletin;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java index 85ad159..144395c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java @@ -79,39 +79,39 @@ public class FlowFromDOMFactory { return styles; } - + public static ControllerServiceDTO getControllerService(final Element element, final StringEncryptor encryptor) { - final ControllerServiceDTO dto = new ControllerServiceDTO(); - - dto.setId(getString(element, "id")); - dto.setName(getString(element, "name")); - dto.setComments(getString(element, "comment")); - dto.setType(getString(element, "class")); - - final boolean enabled = getBoolean(element, "enabled"); - dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name()); - + final ControllerServiceDTO dto = new ControllerServiceDTO(); + + dto.setId(getString(element, "id")); + dto.setName(getString(element, "name")); + dto.setComments(getString(element, "comment")); + dto.setType(getString(element, "class")); + + final boolean enabled = getBoolean(element, "enabled"); + dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name()); + dto.setProperties(getProperties(element, encryptor)); dto.setAnnotationData(getString(element, "annotationData")); return dto; } - + public static ReportingTaskDTO getReportingTask(final Element element, final StringEncryptor encryptor) { - final ReportingTaskDTO dto = new ReportingTaskDTO(); - - dto.setId(getString(element, "id")); - dto.setName(getString(element, "name")); - dto.setComments(getString(element, "comment")); - dto.setType(getString(element, "class")); - dto.setSchedulingPeriod(getString(element, "schedulingPeriod")); - dto.setState(getString(element, "scheduledState")); - dto.setSchedulingStrategy(getString(element, "schedulingStrategy")); - - dto.setProperties(getProperties(element, encryptor)); - dto.setAnnotationData(getString(element, "annotationData")); - - return dto; + final ReportingTaskDTO dto = new ReportingTaskDTO(); + + dto.setId(getString(element, "id")); + dto.setName(getString(element, "name")); + dto.setComments(getString(element, "comment")); + dto.setType(getString(element, "class")); + dto.setSchedulingPeriod(getString(element, "schedulingPeriod")); + dto.setState(getString(element, "scheduledState")); + dto.setSchedulingStrategy(getString(element, "schedulingStrategy")); + + dto.setProperties(getProperties(element, encryptor)); + dto.setAnnotationData(getString(element, "annotationData")); + + return dto; } public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) { @@ -383,7 +383,7 @@ public class FlowFromDOMFactory { } private static LinkedHashMap<String, String> getProperties(final Element element, final StringEncryptor encryptor) { - final LinkedHashMap<String, String> properties = new LinkedHashMap<>(); + final LinkedHashMap<String, String> properties = new LinkedHashMap<>(); final List<Element> propertyNodeList = getChildrenByTagName(element, "property"); for (final Element propertyElement : propertyNodeList) { final String name = getString(propertyElement, "name"); @@ -392,7 +392,7 @@ public class FlowFromDOMFactory { } return properties; } - + private static String getString(final Element element, final String childElementName) { final List<Element> nodeList = getChildrenByTagName(element, childElementName); if (nodeList == null || nodeList.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java index 42d7f1c..7cc3039 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java @@ -42,13 +42,13 @@ public class FlowUnmarshaller { * Flow Configuration schema and returns a FlowSnippetDTO representing the * flow * - * @param flowContents - * @param encryptor - * @return + * @param flowContents contents + * @param encryptor encryptor + * @return snippet dto * @throws NullPointerException if <code>flowContents</code> is null - * @throws IOException - * @throws SAXException - * @throws ParserConfigurationException + * @throws IOException ioe + * @throws SAXException sax + * @throws ParserConfigurationException pe */ public static FlowSnippetDTO unmarshal(final byte[] flowContents, final StringEncryptor encryptor) throws IOException, SAXException, ParserConfigurationException { if (Objects.requireNonNull(flowContents).length == 0) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java index 7cd9d3b..c6aa395 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java @@ -80,17 +80,17 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount()); addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup"); - + final Element controllerServicesNode = doc.createElement("controllerServices"); rootNode.appendChild(controllerServicesNode); - for ( final ControllerServiceNode serviceNode : controller.getAllControllerServices() ) { - addControllerService(controllerServicesNode, serviceNode, encryptor); + for (final ControllerServiceNode serviceNode : controller.getAllControllerServices()) { + addControllerService(controllerServicesNode, serviceNode, encryptor); } - + final Element reportingTasksNode = doc.createElement("reportingTasks"); rootNode.appendChild(reportingTasksNode); - for ( final ReportingTaskNode taskNode : controller.getAllReportingTasks() ) { - addReportingTask(reportingTasksNode, taskNode, encryptor); + for (final ReportingTaskNode taskNode : controller.getAllReportingTasks()) { + addReportingTask(reportingTasksNode, taskNode, encryptor); } final DOMSource domSource = new DOMSource(doc); @@ -314,15 +314,15 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor); - + for (final Relationship rel : processor.getAutoTerminatedRelationships()) { addTextElement(element, "autoTerminatedRelationship", rel.getName()); } } - + private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final StringEncryptor encryptor) { - final Document doc = element.getOwnerDocument(); - for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { + final Document doc = element.getOwnerDocument(); + for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); String value = entry.getValue(); @@ -406,38 +406,37 @@ public class StandardFlowSerializer implements FlowSerializer { parentElement.appendChild(element); } - public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) { - final Element serviceElement = element.getOwnerDocument().createElement("controllerService"); - addTextElement(serviceElement, "id", serviceNode.getIdentifier()); - addTextElement(serviceElement, "name", serviceNode.getName()); - addTextElement(serviceElement, "comment", serviceNode.getComments()); - addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName()); - - final ControllerServiceState state = serviceNode.getState(); - final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); + final Element serviceElement = element.getOwnerDocument().createElement("controllerService"); + addTextElement(serviceElement, "id", serviceNode.getIdentifier()); + addTextElement(serviceElement, "name", serviceNode.getName()); + addTextElement(serviceElement, "comment", serviceNode.getComments()); + addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName()); + + final ControllerServiceState state = serviceNode.getState(); + final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); addTextElement(serviceElement, "enabled", String.valueOf(enabled)); - + addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor); - - element.appendChild(serviceElement); + + element.appendChild(serviceElement); } - + public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final StringEncryptor encryptor) { - final Element taskElement = element.getOwnerDocument().createElement("reportingTask"); - addTextElement(taskElement, "id", taskNode.getIdentifier()); - addTextElement(taskElement, "name", taskNode.getName()); - addTextElement(taskElement, "comment", taskNode.getComments()); - addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName()); + final Element taskElement = element.getOwnerDocument().createElement("reportingTask"); + addTextElement(taskElement, "id", taskNode.getIdentifier()); + addTextElement(taskElement, "name", taskNode.getName()); + addTextElement(taskElement, "comment", taskNode.getComments()); + addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName()); addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod()); addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name()); addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name()); - - addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor); - - element.appendChild(taskElement); + + addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor); + + element.appendChild(taskElement); } - + private static void addTextElement(final Element element, final String name, final long value) { addTextElement(element, name, String.valueOf(value)); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index fcfee83..1511293 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -338,7 +338,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { case RECONNECTION_REQUEST: // Suspend heartbeats until we've reconnected. Otherwise, // we may send a heartbeat while we are still in the process of - // connecting, which will cause the Cluster Manager to mark us + // connecting, which will cause the Cluster Manager to mark us // as "Connected," which becomes problematic as the FlowController's lock // may still be held, causing this node to take a long time to respond to requests. controller.suspendHeartbeats(); @@ -389,7 +389,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { /* * Attempt to connect to the cluster. If the manager is able to * provide a data flow, then the manager will send a connection - * response. If the manager was unable to be located, then + * response. If the manager was unable to be located, then * the response will be null and we should load the local dataflow * and heartbeat until a manager is located. */ @@ -411,10 +411,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.setConnected(false); /* - * Start heartbeating. Heartbeats will fail because we can't reach - * the manager, but when we locate the manager, the node will - * reconnect and establish a connection to the cluster. The - * heartbeat is the trigger that will cause the manager to + * Start heartbeating. Heartbeats will fail because we can't reach + * the manager, but when we locate the manager, the node will + * reconnect and establish a connection to the cluster. The + * heartbeat is the trigger that will cause the manager to * issue a reconnect request. */ controller.startHeartbeating(); @@ -515,7 +515,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { logger.info("Node reconnected."); } catch (final Exception ex) { - // disconnect controller + // disconnect controller if (controller.isClustered()) { disconnect(); } @@ -618,7 +618,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // load the controller tasks // dao.loadReportingTasks(controller); - // initialize the flow controller.initializeFlow(); @@ -638,11 +637,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // send connection request to cluster manager /* - * Try to get a current copy of the cluster's dataflow from the manager - * for ten times, sleeping between attempts. Ten times should be + * Try to get a current copy of the cluster's dataflow from the manager + * for ten times, sleeping between attempts. Ten times should be * enough because the manager will register the node as connecting * and therefore, no other changes to the cluster flow can occur. - * + * * However, the manager needs to obtain a current data flow within * maxAttempts * tryLaterSeconds or else the node will fail to startup. */ @@ -813,7 +812,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { writeLock.lock(); try { dao.save(controller, holder.shouldArchive); - // Nulling it out if it is still set to our current SaveHolder. Otherwise leave it alone because it means + // Nulling it out if it is still set to our current SaveHolder. Otherwise leave it alone because it means // another save is already pending. final boolean noSavePending = StandardFlowService.this.saveHolder.compareAndSet(holder, null); logger.info("Saved flow controller {} // Another save pending = {}", controller, !noSavePending); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 201482c..b66bedc 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -129,7 +129,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } @Override - public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { + public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor) + throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException { // get the controller's root group final ProcessGroup rootGroup = controller.getGroup(controller.getRootGroupId()); @@ -173,20 +174,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks"); final List<Element> taskElements; - if ( reportingTasksElement == null ) { + if (reportingTasksElement == null) { taskElements = Collections.emptyList(); } else { taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); } - + final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); final List<Element> controllerServiceElements; - if ( controllerServicesElement == null ) { + if (controllerServicesElement == null) { controllerServiceElements = Collections.emptyList(); } else { controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); } - + logger.trace("Parsing process group from DOM"); final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor); @@ -230,14 +231,14 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // create document by parsing proposed flow bytes logger.trace("Parsing proposed flow bytes as DOM document"); final Document configuration = parseFlowBytes(proposedFlow.getFlow()); - + // attempt to sync controller with proposed flow try { if (configuration != null) { synchronized (configuration) { // get the root element final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0); - + // set controller config logger.trace("Updating flow config"); final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount"); @@ -248,23 +249,23 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3); controller.setMaxEventDrivenThreadCount(maxThreadCount / 3); } - + // get the root group XML element final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); - + final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); - if ( controllerServicesElement != null ) { - final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); - - if ( !initialized || existingFlowEmpty ) { - ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState); - } else { - for ( final Element serviceElement : serviceElements ) { - updateControllerService(controller, serviceElement, encryptor); - } - } + if (controllerServicesElement != null) { + final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); + + if (!initialized || existingFlowEmpty) { + ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState); + } else { + for (final Element serviceElement : serviceElements) { + updateControllerService(controller, serviceElement, encryptor); + } + } } - + // if this controller isn't initialized or its emtpy, add the root group, otherwise update if (!initialized || existingFlowEmpty) { logger.trace("Adding root process group"); @@ -273,21 +274,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { logger.trace("Updating root process group"); updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); } - + final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks"); - if ( reportingTasksElement != null ) { - final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); - for ( final Element taskElement : taskElements ) { - if ( !initialized || existingFlowEmpty ) { - addReportingTask(controller, taskElement, encryptor); - } else { - updateReportingTask(controller, taskElement, encryptor); - } - } + if (reportingTasksElement != null) { + final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); + for (final Element taskElement : taskElements) { + if (!initialized || existingFlowEmpty) { + addReportingTask(controller, taskElement, encryptor); + } else { + updateReportingTask(controller, taskElement, encryptor); + } + } } } } - + logger.trace("Synching templates"); if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) { // need to load templates @@ -370,105 +371,104 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { return baos.toByteArray(); } - - + private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) { - final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); - - final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState()); + final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor); + + final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState()); final boolean dtoEnabled = (dtoState == ControllerServiceState.ENABLED || dtoState == ControllerServiceState.ENABLING); - + final ControllerServiceNode serviceNode = controller.getControllerServiceNode(dto.getId()); final ControllerServiceState serviceState = serviceNode.getState(); final boolean serviceEnabled = (serviceState == ControllerServiceState.ENABLED || serviceState == ControllerServiceState.ENABLING); - - if (dtoEnabled && !serviceEnabled) { - controller.enableControllerService(controller.getControllerServiceNode(dto.getId())); - } else if (!dtoEnabled && serviceEnabled) { - controller.disableControllerService(controller.getControllerServiceNode(dto.getId())); - } + + if (dtoEnabled && !serviceEnabled) { + controller.enableControllerService(controller.getControllerServiceNode(dto.getId())); + } else if (!dtoEnabled && serviceEnabled) { + controller.disableControllerService(controller.getControllerServiceNode(dto.getId())); + } } - + private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException { - final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); - - final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); - reportingTask.setName(dto.getName()); - reportingTask.setComments(dto.getComments()); - reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod()); - reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); - - reportingTask.setAnnotationData(dto.getAnnotationData()); - + final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); + + final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); + reportingTask.setName(dto.getName()); + reportingTask.setComments(dto.getComments()); + reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod()); + reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); + + reportingTask.setAnnotationData(dto.getAnnotationData()); + for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { if (entry.getValue() == null) { - reportingTask.removeProperty(entry.getKey()); + reportingTask.removeProperty(entry.getKey()); } else { - reportingTask.setProperty(entry.getKey(), entry.getValue()); + reportingTask.setProperty(entry.getKey(), entry.getValue()); } } - + final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); - + try { reportingTask.getReportingTask().initialize(config); } catch (final InitializationException ie) { throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie); } - - if ( autoResumeState ) { - if ( ScheduledState.RUNNING.name().equals(dto.getState()) ) { - try { - controller.startReportingTask(reportingTask); - } catch (final Exception e) { - logger.error("Failed to start {} due to {}", reportingTask, e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( - "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e)); - } - } else if ( ScheduledState.DISABLED.name().equals(dto.getState()) ) { - try { - controller.disableReportingTask(reportingTask); - } catch (final Exception e) { - logger.error("Failed to mark {} as disabled due to {}", reportingTask, e); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( - "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e)); - } - } + + if (autoResumeState) { + if (ScheduledState.RUNNING.name().equals(dto.getState())) { + try { + controller.startReportingTask(reportingTask); + } catch (final Exception e) { + logger.error("Failed to start {} due to {}", reportingTask, e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( + "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e)); + } + } else if (ScheduledState.DISABLED.name().equals(dto.getState())) { + try { + controller.disableReportingTask(reportingTask); + } catch (final Exception e) { + logger.error("Failed to mark {} as disabled due to {}", reportingTask, e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( + "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e)); + } + } } } private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) { - final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); - final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId()); - + final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); + final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId()); + if (!taskNode.getScheduledState().name().equals(dto.getState())) { try { switch (ScheduledState.valueOf(dto.getState())) { case DISABLED: - if ( taskNode.isRunning() ) { - controller.stopReportingTask(taskNode); - } - controller.disableReportingTask(taskNode); + if (taskNode.isRunning()) { + controller.stopReportingTask(taskNode); + } + controller.disableReportingTask(taskNode); break; case RUNNING: - if ( taskNode.getScheduledState() == ScheduledState.DISABLED ) { - controller.enableReportingTask(taskNode); - } - controller.startReportingTask(taskNode); + if (taskNode.getScheduledState() == ScheduledState.DISABLED) { + controller.enableReportingTask(taskNode); + } + controller.startReportingTask(taskNode); break; case STOPPED: if (taskNode.getScheduledState() == ScheduledState.DISABLED) { - controller.enableReportingTask(taskNode); + controller.enableReportingTask(taskNode); } else if (taskNode.getScheduledState() == ScheduledState.RUNNING) { - controller.stopReportingTask(taskNode); + controller.stopReportingTask(taskNode); } break; } @@ -486,9 +486,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } } - - - private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException { + + private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) + throws ProcessorInstantiationException { // get the parent group ID final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); @@ -698,7 +698,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { return new Position(dto.getX(), dto.getY()); } - private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller) throws ProcessorInstantiationException { + private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller) + throws ProcessorInstantiationException { final ProcessorConfigDTO config = processorDTO.getConfig(); procNode.setPosition(toPosition(processorDTO.getPosition())); procNode.setName(processorDTO.getName()); @@ -747,7 +748,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException { + private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) + throws ProcessorInstantiationException { // get the parent group ID final String parentId = (parentGroup == null) ? null : parentGroup.getIdentifier(); @@ -866,7 +868,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement); final Funnel funnel = controller.createFunnel(funnelDTO.getId()); funnel.setPosition(toPosition(funnelDTO.getPosition())); - + // Since this is called during startup, we want to add the funnel without enabling it // and then tell the controller to enable it. This way, if the controller is not fully // initialized, the starting of the funnel is delayed until the controller is ready. @@ -1031,7 +1033,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { * Returns true if the given controller can inherit the proposed flow * without orphaning flow files. * - * @param existingFlow + * @param existingFlow flow * @param controller the running controller * @param proposedFlow the flow to inherit * @@ -1081,7 +1083,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { * Returns true if the given controller can inherit the proposed flow * without orphaning flow files. * - * @param existingFlow + * @param existingFlow flow * @param proposedFlow the flow to inherit * * @return null if the controller can inherit the specified flow, an