http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java deleted file mode 100644 index 346e801..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java +++ /dev/null @@ -1,3579 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import static java.util.Objects.requireNonNull; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import javax.net.ssl.SSLContext; - -import org.apache.nifi.admin.service.UserService; -import org.apache.nifi.cluster.BulletinsPayload; -import org.apache.nifi.cluster.HeartbeatPayload; -import org.apache.nifi.cluster.protocol.DataFlow; -import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeBulletins; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.NodeProtocolSender; -import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Funnel; -import org.apache.nifi.connectable.LocalPort; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.connectable.Position; -import org.apache.nifi.connectable.Size; -import org.apache.nifi.connectable.StandardConnection; -import org.apache.nifi.controller.exception.CommunicationsException; -import org.apache.nifi.controller.exception.ProcessorInstantiationException; -import org.apache.nifi.controller.exception.ProcessorLifeCycleException; -import org.apache.nifi.controller.label.Label; -import org.apache.nifi.controller.label.StandardLabel; -import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; -import org.apache.nifi.controller.reporting.StandardReportingTaskNode; -import org.apache.nifi.controller.repository.ContentRepository; -import org.apache.nifi.controller.repository.CounterRepository; -import org.apache.nifi.controller.repository.FlowFileEvent; -import org.apache.nifi.controller.repository.FlowFileEventRepository; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.controller.repository.FlowFileRepository; -import org.apache.nifi.controller.repository.FlowFileSwapManager; -import org.apache.nifi.controller.repository.QueueProvider; -import org.apache.nifi.controller.repository.RepositoryRecord; -import org.apache.nifi.controller.repository.RepositoryStatusReport; -import org.apache.nifi.controller.repository.StandardCounterRepository; -import org.apache.nifi.controller.repository.StandardFlowFileRecord; -import org.apache.nifi.controller.repository.StandardRepositoryRecord; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ContentClaimManager; -import org.apache.nifi.controller.repository.claim.ContentDirection; -import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; -import org.apache.nifi.controller.repository.io.LimitedInputStream; -import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; -import org.apache.nifi.controller.scheduling.ProcessContextFactory; -import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; -import org.apache.nifi.controller.scheduling.StandardProcessScheduler; -import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.controller.service.StandardControllerServiceProvider; -import org.apache.nifi.controller.status.ConnectionStatus; -import org.apache.nifi.controller.status.PortStatus; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.RunStatus; -import org.apache.nifi.controller.status.TransmissionStatus; -import org.apache.nifi.controller.status.history.ComponentStatusRepository; -import org.apache.nifi.controller.status.history.StatusHistoryUtil; -import org.apache.nifi.controller.tasks.ExpireFlowFiles; -import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.diagnostics.SystemDiagnosticsFactory; -import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.events.NodeBulletinProcessingStrategy; -import org.apache.nifi.events.VolatileBulletinRepository; -import org.apache.nifi.flowfile.FlowFilePrioritizer; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.framework.security.util.SslContextFactory; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.groups.RemoteProcessGroup; -import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; -import org.apache.nifi.groups.StandardProcessGroup; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.logging.LogLevel; -import org.apache.nifi.logging.LogRepository; -import org.apache.nifi.logging.LogRepositoryFactory; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.logging.ProcessorLogObserver; -import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.nar.NarThreadContextClassLoader; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.QueueSize; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.SimpleProcessLogger; -import org.apache.nifi.processor.StandardProcessorInitializationContext; -import org.apache.nifi.processor.StandardValidationContextFactory; -import org.apache.nifi.processor.annotation.OnAdded; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.ProvenanceEventRepository; -import org.apache.nifi.provenance.ProvenanceEventType; -import org.apache.nifi.provenance.StandardProvenanceEventRecord; -import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.RemoteResourceManager; -import org.apache.nifi.remote.RemoteSiteListener; -import org.apache.nifi.remote.RootGroupPort; -import org.apache.nifi.remote.SocketRemoteSiteListener; -import org.apache.nifi.remote.StandardRemoteProcessGroup; -import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; -import org.apache.nifi.remote.StandardRootGroupPort; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol; -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.EventAccess; -import org.apache.nifi.reporting.ReportingTask; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.ReflectionUtils; -import org.apache.nifi.web.api.dto.ConnectableDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.PositionDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RelationshipDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.TemplateDTO; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sun.jersey.api.client.ClientHandlerException; - -public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider { - - // default repository implementations - public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; - public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository"; - public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository"; - public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager"; - public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"; - - public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds"; - public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds"; - public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10; - public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures - - public static final String ROOT_GROUP_ID_ALIAS = "root"; - public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow"; - - private final AtomicInteger maxTimerDrivenThreads; - private final AtomicInteger maxEventDrivenThreads; - private final AtomicReference<FlowEngine> timerDrivenEngineRef; - private final AtomicReference<FlowEngine> eventDrivenEngineRef; - - private final ContentRepository contentRepository; - private final FlowFileRepository flowFileRepository; - private final FlowFileEventRepository flowFileEventRepository; - private final ProvenanceEventRepository provenanceEventRepository; - private final VolatileBulletinRepository bulletinRepository; - private final StandardProcessScheduler processScheduler; - private final TemplateManager templateManager; - private final SnippetManager snippetManager; - private final long gracefulShutdownSeconds; - private final ExtensionManager extensionManager; - private final NiFiProperties properties; - private final SSLContext sslContext; - private final RemoteSiteListener externalSiteListener; - private final AtomicReference<CounterRepository> counterRepositoryRef; - private final AtomicBoolean initialized = new AtomicBoolean(false); - private final ControllerServiceProvider controllerServiceProvider; - private final UserService userService; - private final EventDrivenWorkerQueue eventDrivenWorkerQueue; - private final ComponentStatusRepository componentStatusRepository; - private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started - private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); - - // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may - // change while the instance is running. We do this because we want to generate heartbeats even if we - // are unable to obtain a read lock on the entire FlowController. - private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new AtomicReference<>(); - private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false); - - private final Integer remoteInputSocketPort; - private final Boolean isSiteToSiteSecure; - private Integer clusterManagerRemoteSitePort = null; - private Boolean clusterManagerRemoteSiteCommsSecure = null; - - private ProcessGroup rootGroup; - private final List<Connectable> startConnectablesAfterInitialization; - private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization; - - /** - * true if controller is configured to operate in a clustered environment - */ - private final boolean configuredForClustering; - - /** - * the time to wait between heartbeats - */ - private final int heartbeatDelaySeconds; - - /** - * The sensitive property string encryptor * - */ - private final StringEncryptor encryptor; - - /** - * cluster protocol sender - */ - private final NodeProtocolSender protocolSender; - - private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks"); - private final ContentClaimManager contentClaimManager = new StandardContentClaimManager(); - - // guarded by rwLock - /** - * timer to periodically send heartbeats to the cluster - */ - private ScheduledFuture<?> bulletinFuture; - private ScheduledFuture<?> heartbeatGeneratorFuture; - private ScheduledFuture<?> heartbeatSenderFuture; - - // guarded by FlowController lock - /** - * timer task to generate heartbeats - */ - private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null); - - private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber; - - // guarded by rwLock - /** - * the node identifier; - */ - private NodeIdentifier nodeId; - - // guarded by rwLock - /** - * true if controller is connected or trying to connect to the cluster - */ - private boolean clustered; - private String clusterManagerDN; - - // guarded by rwLock - /** - * true if controller is the primary of the cluster - */ - private boolean primary; - - // guarded by rwLock - /** - * true if connected to a cluster - */ - private boolean connected; - - // guarded by rwLock - private String instanceId; - - private volatile boolean shutdown = false; - - private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - private FlowFileSwapManager flowFileSwapManager; // guarded by read/write lock - - private static final Logger LOG = LoggerFactory.getLogger(FlowController.class); - private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"); - - public static FlowController createStandaloneInstance( - final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, - final UserService userService, - final StringEncryptor encryptor) { - return new FlowController( - flowFileEventRepo, - properties, - userService, - encryptor, - /* configuredForClustering */ false, - /* NodeProtocolSender */ null); - } - - public static FlowController createClusteredInstance( - final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, - final UserService userService, - final StringEncryptor encryptor, - final NodeProtocolSender protocolSender) { - final FlowController flowController = new FlowController( - flowFileEventRepo, - properties, - userService, - encryptor, - /* configuredForClustering */ true, - /* NodeProtocolSender */ protocolSender); - - flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure()); - - return flowController; - } - - private FlowController( - final FlowFileEventRepository flowFileEventRepo, - final NiFiProperties properties, - final UserService userService, - final StringEncryptor encryptor, - final boolean configuredForClustering, - final NodeProtocolSender protocolSender) { - - maxTimerDrivenThreads = new AtomicInteger(10); - maxEventDrivenThreads = new AtomicInteger(5); - - this.encryptor = encryptor; - this.properties = properties; - sslContext = SslContextFactory.createSslContext(properties, false); - extensionManager = new ExtensionManager(); - controllerServiceProvider = new StandardControllerServiceProvider(); - - timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); - eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); - - final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager); - flowFileRepository = flowFileRepo; - flowFileEventRepository = flowFileEventRepo; - counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); - - bulletinRepository = new VolatileBulletinRepository(); - nodeBulletinSubscriber = new AtomicReference<>(); - - try { - this.provenanceEventRepository = createProvenanceRepository(properties); - this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository)); - - this.contentRepository = createContentRepository(properties); - } catch (final Exception e) { - throw new RuntimeException("Unable to create Provenance Repository", e); - } - - processScheduler = new StandardProcessScheduler(this, this, encryptor); - eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); - - final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository); - processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( - eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); - - final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); - final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); - processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); - processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); - processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); - processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); - - startConnectablesAfterInitialization = new ArrayList<>(); - startRemoteGroupPortsAfterInitialization = new ArrayList<>(); - this.userService = userService; - - final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); - long shutdownSecs; - try { - shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal); - if (shutdownSecs < 1) { - shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS; - } - } catch (final NumberFormatException nfe) { - shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS; - } - gracefulShutdownSeconds = shutdownSecs; - - remoteInputSocketPort = properties.getRemoteInputPort(); - isSiteToSiteSecure = properties.isSiteToSiteSecure(); - - if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) { - throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured"); - } - - this.configuredForClustering = configuredForClustering; - this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); - this.protocolSender = protocolSender; - try { - this.templateManager = new TemplateManager(properties.getTemplateDirectory()); - } catch (IOException e) { - throw new RuntimeException(e); - } - - this.snippetManager = new SnippetManager(); - - rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor); - rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); - instanceId = UUID.randomUUID().toString(); - - if (remoteInputSocketPort == null){ - LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set"); - externalSiteListener = null; - } else if (isSiteToSiteSecure && sslContext == null) { - LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed."); - externalSiteListener = null; - } else { - // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol - RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class); - externalSiteListener = new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null); - externalSiteListener.setRootGroup(rootGroup); - } - - // Determine frequency for obtaining component status snapshots - final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); - long snapshotMillis; - try { - snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); - } - - componentStatusRepository = createComponentStatusRepository(); - timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - componentStatusRepository.capture(getControllerStatus()); - } - }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); - - heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false)); - } - - private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) { - final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION); - if (implementationClassName == null) { - throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION); - } - - try { - final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class); - synchronized (created) { - created.initialize(contentClaimManager); - } - return created; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private static FlowFileSwapManager createSwapManager(final NiFiProperties properties) { - final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION); - if (implementationClassName == null) { - return null; - } - - try { - return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) { - return new EventReporter() { - @Override - public void reportEvent(final Severity severity, final String category, final String message) { - final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); - bulletinRepository.addBulletin(bulletin); - } - }; - } - - public void initializeFlow() throws IOException { - writeLock.lock(); - try { - flowFileSwapManager = createSwapManager(properties); - - long maxIdFromSwapFiles = -1L; - if (flowFileSwapManager != null) { - if (flowFileRepository.isVolatile()) { - flowFileSwapManager.purge(); - } else { - maxIdFromSwapFiles = flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager); - } - } - - flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1); - - // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the - // ContentRepository to purge superfluous files - contentRepository.cleanup(); - - if (flowFileSwapManager != null) { - flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository)); - } - - if (externalSiteListener != null) { - externalSiteListener.start(); - } - - timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - updateRemoteProcessGroups(); - } catch (final Throwable t) { - LOG.warn("Unable to update Remote Process Groups due to " + t); - if (LOG.isDebugEnabled()) { - LOG.warn("", t); - } - } - } - }, 0L, 30L, TimeUnit.SECONDS); - - initialized.set(true); - } finally { - writeLock.unlock(); - } - } - - /** - * <p> - * Causes any processors that were added to the flow with a 'delayStart' - * flag of true to now start - * </p> - */ - public void startDelayed() { - writeLock.lock(); - try { - LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); - for (final Connectable connectable : startConnectablesAfterInitialization) { - if (connectable.getScheduledState() == ScheduledState.DISABLED) { - continue; - } - - try { - if (connectable instanceof ProcessorNode) { - connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); - } else { - startConnectable(connectable); - } - } catch (final Throwable t) { - LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); - } - } - - startConnectablesAfterInitialization.clear(); - - int startedTransmitting = 0; - for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) { - try { - remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort); - startedTransmitting++; - } catch (final Throwable t) { - LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t}); - } - } - - LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting); - startRemoteGroupPortsAfterInitialization.clear(); - } finally { - writeLock.unlock(); - } - } - - private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { - final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION); - if (implementationClassName == null) { - throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION); - } - - try { - final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class); - synchronized (contentRepo) { - contentRepo.initialize(contentClaimManager); - } - return contentRepo; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private ProvenanceEventRepository createProvenanceRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { - final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION); - if (implementationClassName == null) { - throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS); - } - - try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceEventRepository.class); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private ComponentStatusRepository createComponentStatusRepository() { - final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); - if (implementationClassName == null) { - throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); - } - - try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - /** - * Creates a connection between two Connectable objects. - * - * @param id required ID of the connection - * @param name the name of the connection, or <code>null</code> to leave the - * connection unnamed - * @param source required source - * @param destination required destination - * @param relationshipNames required collection of relationship names - * @return - * - * @throws NullPointerException if the ID, source, destination, or set of - * relationships is null. - * @throws IllegalArgumentException if <code>relationships</code> is an - * empty collection - */ - public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) { - final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler); - - final List<Relationship> relationships = new ArrayList<>(); - for (final String relationshipName : requireNonNull(relationshipNames)) { - relationships.add(new Relationship.Builder().name(relationshipName).build()); - } - - return builder.id(requireNonNull(id).intern()).name(name == null ? null : name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build(); - } - - /** - * Creates a new Label - * - * @param id - * @param text - * @return - * @throws NullPointerException if either argument is null - */ - public Label createLabel(final String id, final String text) { - return new StandardLabel(requireNonNull(id).intern(), text); - } - - /** - * Creates a funnel - * - * @param id - * @return - */ - public Funnel createFunnel(final String id) { - return new StandardFunnel(id.intern(), null, processScheduler); - } - - /** - * Creates a Port to use as an Input Port for a Process Group - * - * @param id - * @param name - * @return - * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the - * same name or id. - */ - public Port createLocalInputPort(String id, String name) { - id = requireNonNull(id).intern(); - name = requireNonNull(name).intern(); - verifyPortIdDoesNotExist(id); - return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler); - } - - /** - * Creates a Port to use as an Output Port for a Process Group - * - * @param id - * @param name - * @return - * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the - * same name or id. - */ - public Port createLocalOutputPort(String id, String name) { - id = requireNonNull(id).intern(); - name = requireNonNull(name).intern(); - verifyPortIdDoesNotExist(id); - return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler); - } - - /** - * Creates a ProcessGroup with the given ID - * - * @param id - * @return - * @throws NullPointerException if the argument is null - */ - public ProcessGroup createProcessGroup(final String id) { - return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor); - } - - /** - * <p> - * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the - * methods annotated with {@link OnAdded}. - * </p> - * - * @param type - * @param id - * @return - * @throws NullPointerException if either arg is null - * @throws ProcessorInstantiationException if the processor cannot be - * instantiated for any reason - */ - public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException { - return createProcessor(type, id, true); - } - - /** - * <p> - * Creates a new ProcessorNode with the given type and identifier and optionally initializes it. - * </p> - * - * @param type the fully qualified Processor class name - * @param id the unique ID of the Processor - * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true}, - * will invoke methods annotated with the {@link OnAdded} annotation. - * @return - * @throws NullPointerException if either arg is null - * @throws ProcessorInstantiationException if the processor cannot be - * instantiated for any reason - */ - public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException { - id = id.intern(); - final Processor processor = instantiateProcessor(type, id); - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); - final ProcessorNode procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); - - final LogRepository logRepository = LogRepositoryFactory.getRepository(id); - logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); - - if ( firstTimeAdded ) { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); - } catch (final Exception e) { - logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); - throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); - } - } - - return procNode; - } - - private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException { - Processor processor; - - final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); - try { - final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); - final Class<?> rawClass; - if (detectedClassLoaderForType == null) { - // try to find from the current class loader - rawClass = Class.forName(type); - } else { - // try to find from the registered classloader for that type - rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); - } - - Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); - final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class); - processor = processorClass.newInstance(); - final ProcessorLog processorLogger = new SimpleProcessLogger(identifier, processor); - final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this); - processor.initialize(ctx); - return processor; - } catch (final Throwable t) { - throw new ProcessorInstantiationException(type, t); - } finally { - if (ctxClassLoader != null) { - Thread.currentThread().setContextClassLoader(ctxClassLoader); - } - } - } - - /** - * @return the ExtensionManager used for instantiating Processors, - * Prioritizers, etc. - */ - public ExtensionManager getExtensionManager() { - return extensionManager; - } - - public String getInstanceId() { - readLock.lock(); - try { - return instanceId; - } finally { - readLock.unlock(); - } - } - - /** - * Gets the BulletinRepository for storing and retrieving Bulletins. - * - * @return - */ - public BulletinRepository getBulletinRepository() { - return bulletinRepository; - } - - public SnippetManager getSnippetManager() { - return snippetManager; - } - - /** - * Creates a Port to use as an Input Port for the root Process Group, which - * is used for Site-to-Site communications - * - * @param id - * @param name - * @return - * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the - * same name or id. - */ - public Port createRemoteInputPort(String id, String name) { - id = requireNonNull(id).intern(); - name = requireNonNull(name).intern(); - verifyPortIdDoesNotExist(id); - return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); - } - - /** - * Creates a Port to use as an Output Port for the root Process Group, which - * is used for Site-to-Site communications and will queue flow files waiting - * to be delivered to remote instances - * - * @param id - * @param name - * @return - * @throws NullPointerException if the ID or name is not unique - * @throws IllegalStateException if an Input Port already exists with the - * same name or id. - */ - public Port createRemoteOutputPort(String id, String name) { - id = requireNonNull(id).intern(); - name = requireNonNull(name).intern(); - verifyPortIdDoesNotExist(id); - return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); - } - - /** - * Creates a new Remote Process Group with the given ID that points to the - * given URI - * - * @param id - * @param uri - * @return - * - * @throws NullPointerException if either argument is null - * @throws IllegalArgumentException if <code>uri</code> is not a valid URI. - */ - public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) { - return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext); - } - - /** - * Verifies that no output port exists with the given id or name. If this - * does not hold true, throws an IllegalStateException - * - * @param id - * @throws IllegalStateException - */ - private void verifyPortIdDoesNotExist(final String id) { - Port port = rootGroup.findOutputPort(id); - if (port != null) { - throw new IllegalStateException("An Input Port already exists with ID " + id); - } - port = rootGroup.findInputPort(id); - if (port != null) { - throw new IllegalStateException("An Input Port already exists with ID " + id); - } - } - - /** - * @return the name of this controller, which is also the name of the Root - * Group. - */ - public String getName() { - readLock.lock(); - try { - return rootGroup.getName(); - } finally { - readLock.unlock(); - } - } - - /** - * Sets the name for the Root Group, which also changes the name for the - * controller. - * - * @param name - */ - public void setName(final String name) { - readLock.lock(); - try { - rootGroup.setName(name); - } finally { - readLock.unlock(); - } - } - - /** - * Gets the comments of this controller, which is also the comment of the - * Root Group. - * - * @return - */ - public String getComments() { - readLock.lock(); - try { - return rootGroup.getComments(); - } finally { - readLock.unlock(); - } - } - - /** - * Sets the comment for the Root Group, which also changes the comment for - * the controller. - * - * @param comments - */ - public void setComments(final String comments) { - readLock.lock(); - try { - rootGroup.setComments(comments); - } finally { - readLock.unlock(); - } - } - - /** - * @return <code>true</code> if the scheduling engine for this controller - * has been terminated. - */ - public boolean isTerminated() { - this.readLock.lock(); - try { - return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated()); - } finally { - this.readLock.unlock(); - } - } - - /** - * Triggers the controller to begin shutdown, stopping all processors and - * terminating the scheduling engine. After calling this method, the - * {@link #isTerminated()} method will indicate whether or not the shutdown - * has finished. - * - * @param kill if <code>true</code>, attempts to stop all active threads, - * but makes no guarantee that this will happen - * - * @throws IllegalStateException if the controller is already stopped or - * currently in the processor of stopping - */ - public void shutdown(final boolean kill) { - this.shutdown = true; - stopAllProcessors(); - - writeLock.lock(); - try { - if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) { - throw new IllegalStateException("Controller already stopped or still stopping..."); - } - - if (kill) { - this.timerDrivenEngineRef.get().shutdownNow(); - this.eventDrivenEngineRef.get().shutdownNow(); - LOG.info("Initiated immediate shutdown of flow controller..."); - } else { - this.timerDrivenEngineRef.get().shutdown(); - this.eventDrivenEngineRef.get().shutdown(); - LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds"); - } - - clusterTaskExecutor.shutdown(); - - // Trigger any processors' methods marked with @OnShutdown to be called - rootGroup.shutdown(); - - try { - this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); - this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); - } catch (final InterruptedException ie) { - LOG.info("Interrupted while waiting for controller termination."); - } - - try { - flowFileRepository.close(); - } catch (final Throwable t) { - LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t}); - } - - if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) { - LOG.info("Controller has been terminated successfully."); - } else { - LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop. Might need to kill the program manually."); - } - - if (externalSiteListener != null) { - externalSiteListener.stop(); - } - - if (flowFileSwapManager != null) { - flowFileSwapManager.shutdown(); - } - - if ( processScheduler != null ) { - processScheduler.shutdown(); - } - - if ( contentRepository != null ) { - contentRepository.shutdown(); - } - - if ( provenanceEventRepository != null ) { - try { - provenanceEventRepository.close(); - } catch (final IOException ioe) { - LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString()); - if ( LOG.isDebugEnabled() ) { - LOG.warn("", ioe); - } - } - } - } finally { - writeLock.unlock(); - } - } - - /** - * Serializes the current state of the controller to the given OutputStream - * - * @param serializer - * @param os - * @throws FlowSerializationException if serialization of the flow fails for - * any reason - */ - public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { - readLock.lock(); - try { - serializer.serialize(this, os); - } finally { - readLock.unlock(); - } - } - - /** - * Synchronizes this controller with the proposed flow. - * - * For more details, see - * {@link FlowSynchronizer#sync(FlowController, DataFlow)}. - * - * @param synchronizer - * @param dataFlow the flow to load the controller with. If the flow is null - * or zero length, then the controller must not have a flow or else an - * UninheritableFlowException will be thrown. - * - * @throws FlowSerializationException if proposed flow is not a valid flow - * configuration file - * @throws UninheritableFlowException if the proposed flow cannot be loaded - * by the controller because in doing so would risk orphaning flow files - * @throws FlowSynchronizationException if updates to the controller failed. - * If this exception is thrown, then the controller should be considered - * unsafe to be used - */ - public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow) - throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { - writeLock.lock(); - try { - LOG.debug("Synchronizing controller with proposed flow"); - synchronizer.sync(this, dataFlow, encryptor); - LOG.info("Successfully synchronized controller with proposed flow"); - } finally { - writeLock.unlock(); - } - } - - /** - * @return the currently configured maximum number of threads that can be - * used for executing processors at any given time. - */ - public int getMaxTimerDrivenThreadCount() { - return maxTimerDrivenThreads.get(); - } - - public int getMaxEventDrivenThreadCount() { - return maxEventDrivenThreads.get(); - } - - public void setMaxTimerDrivenThreadCount(final int maxThreadCount) { - writeLock.lock(); - try { - setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads); - } finally { - writeLock.unlock(); - } - } - - public void setMaxEventDrivenThreadCount(final int maxThreadCount) { - writeLock.lock(); - try { - setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads); - processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount); - } finally { - writeLock.unlock(); - } - } - - /** - * Updates the number of threads that can be simultaneously used for - * executing processors. - * - * @param maxThreadCount - * - * This method must be called while holding the write lock! - */ - private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) { - if (maxThreadCount < 1) { - throw new IllegalArgumentException(); - } - - maxThreads.getAndSet(maxThreadCount); - if (null != engine && engine.getCorePoolSize() < maxThreadCount) { - engine.setCorePoolSize(maxThreads.intValue()); - } - } - - /** - * @return the ID of the root group - */ - public String getRootGroupId() { - readLock.lock(); - try { - return rootGroup.getIdentifier(); - } finally { - readLock.unlock(); - } - } - - /** - * Sets the root group to the given group - * - * @param group the ProcessGroup that is to become the new Root Group - * - * @throws IllegalArgumentException if the ProcessGroup has a parent - * @throws IllegalStateException if the FlowController does not know about - * the given process group - */ - void setRootGroup(final ProcessGroup group) { - if (requireNonNull(group).getParent() != null) { - throw new IllegalArgumentException("A ProcessGroup that has a parent cannot be the Root Group"); - } - - writeLock.lock(); - try { - rootGroup = group; - - if (externalSiteListener != null) { - externalSiteListener.setRootGroup(group); - } - - // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); - } finally { - writeLock.unlock(); - } - } - - public SystemDiagnostics getSystemDiagnostics() { - final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory(); - return factory.create(flowFileRepository, contentRepository); - } - - // - // ProcessGroup access - // - /** - * Updates the process group corresponding to the specified DTO. Any field - * in DTO that is <code>null</code> (with the exception of the required ID) - * will be ignored. - * - * @param dto - * @return a fully-populated DTO representing the newly updated ProcessGroup - * @throws ProcessorInstantiationException - * - * @throws IllegalStateException if no process group can be found with the - * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID - * specified is invalid, or if the DTO's Parent Group ID changes but the - * parent group has incoming or outgoing connections - * - * @throws NullPointerException if the DTO or its ID is null - */ - public void updateProcessGroup(final ProcessGroupDTO dto) throws ProcessorInstantiationException { - final ProcessGroup group = lookupGroup(requireNonNull(dto).getId()); - - final String name = dto.getName(); - final PositionDTO position = dto.getPosition(); - final String comments = dto.getComments(); - - if (name != null) { - group.setName(name); - } - if (position != null) { - group.setPosition(toPosition(position)); - } - if (comments != null) { - group.setComments(comments); - } - } - - // - // Template access - // - /** - * Adds a template to this controller. The contents of this template must be - * part of the current flow. This is going create a template based on a - * snippet of this flow. - * - * @param dto - * @return a copy of the given DTO - * @throws IOException if an I/O error occurs when persisting the Template - * @throws NullPointerException if the DTO is null - * @throws IllegalArgumentException if does not contain all required - * information, such as the template name or a processor's configuration - * element - */ - public Template addTemplate(final TemplateDTO dto) throws IOException { - return templateManager.addTemplate(dto); - } - - /** - * Removes all templates from this controller - * - * @throws IOException - */ - public void clearTemplates() throws IOException { - templateManager.clear(); - } - - /** - * Imports the specified template into this controller. The contents of this - * template may have come from another NiFi instance. - * - * @param dto - * @return - * @throws IOException - */ - public Template importTemplate(final TemplateDTO dto) throws IOException { - return templateManager.importTemplate(dto); - } - - /** - * Returns the template with the given ID, or <code>null</code> if no - * template exists with the given ID. - * - * @param id - * @return - */ - public Template getTemplate(final String id) { - return templateManager.getTemplate(id); - } - - public TemplateManager getTemplateManager() { - return templateManager; - } - - /** - * Returns all templates that this controller knows about. - * - * @return - */ - public Collection<Template> getTemplates() { - return templateManager.getTemplates(); - } - - /** - * Removes the template with the given ID. - * - * @param id the ID of the template to remove - * @throws NullPointerException if the argument is null - * @throws IllegalStateException if no template exists with the given ID - * @throws IOException if template could not be removed - */ - public void removeTemplate(final String id) throws IOException, IllegalStateException { - templateManager.removeTemplate(id); - } - - private Position toPosition(final PositionDTO dto) { - return new Position(dto.getX(), dto.getY()); - } - - // - // Snippet - // - /** - * Creates an instance of the given snippet and adds the components to the - * given group - * - * @param group - * @param dto - * - * @throws NullPointerException if either argument is null - * @throws IllegalStateException if the snippet is not valid because a - * component in the snippet has an ID that is not unique to this flow, or - * because it shares an Input Port or Output Port at the root level whose - * name already exists in the given ProcessGroup, or because the Template - * contains a Processor or a Prioritizer whose class is not valid within - * this instance of NiFi. - * @throws ProcessorInstantiationException if unable to instantiate a - * processor - */ - public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { - writeLock.lock(); - try { - validateSnippetContents(requireNonNull(group), dto); - - // - // Instantiate the labels - // - for (final LabelDTO labelDTO : dto.getLabels()) { - final Label label = createLabel(labelDTO.getId(), labelDTO.getLabel()); - label.setPosition(toPosition(labelDTO.getPosition())); - if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) { - label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); - } - - // TODO: Update the label's "style" - group.addLabel(label); - } - - // - // Instantiate the funnels - for (final FunnelDTO funnelDTO : dto.getFunnels()) { - final Funnel funnel = createFunnel(funnelDTO.getId()); - funnel.setPosition(toPosition(funnelDTO.getPosition())); - group.addFunnel(funnel); - } - - // - // Instantiate Input Ports & Output Ports - // - for (final PortDTO portDTO : dto.getInputPorts()) { - final Port inputPort; - if (group.isRootGroup()) { - inputPort = createRemoteInputPort(portDTO.getId(), portDTO.getName()); - inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); - if (portDTO.getGroupAccessControl() != null) { - ((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl()); - } - if (portDTO.getUserAccessControl() != null) { - ((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl()); - } - } else { - inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName()); - } - - inputPort.setPosition(toPosition(portDTO.getPosition())); - inputPort.setProcessGroup(group); - inputPort.setComments(portDTO.getComments()); - group.addInputPort(inputPort); - } - - for (final PortDTO portDTO : dto.getOutputPorts()) { - final Port outputPort; - if (group.isRootGroup()) { - outputPort = createRemoteOutputPort(portDTO.getId(), portDTO.getName()); - outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); - if (portDTO.getGroupAccessControl() != null) { - ((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl()); - } - if (portDTO.getUserAccessControl() != null) { - ((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl()); - } - } else { - outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName()); - } - - outputPort.setPosition(toPosition(portDTO.getPosition())); - outputPort.setProcessGroup(group); - outputPort.setComments(portDTO.getComments()); - group.addOutputPort(outputPort); - } - - // - // Instantiate the processors - // - for (final ProcessorDTO processorDTO : dto.getProcessors()) { - final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId()); - - procNode.setPosition(toPosition(processorDTO.getPosition())); - procNode.setProcessGroup(group); - - final ProcessorConfigDTO config = processorDTO.getConfig(); - procNode.setComments(config.getComments()); - if (config.isLossTolerant() != null) { - procNode.setLossTolerant(config.isLossTolerant()); - } - procNode.setName(processorDTO.getName()); - - procNode.setYieldPeriod(config.getYieldDuration()); - procNode.setPenalizationPeriod(config.getPenaltyDuration()); - procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); - procNode.setAnnotationData(config.getAnnotationData()); - procNode.setStyle(processorDTO.getStyle()); - - if (config.getRunDurationMillis() != null) { - procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS); - } - - if (config.getSchedulingStrategy() != null) { - procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); - } - - // ensure that the scheduling strategy is set prior to these values - procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); - procNode.setScheduldingPeriod(config.getSchedulingPeriod()); - - final Set<Relationship> relationships = new HashSet<>(); - if (processorDTO.getRelationships() != null) { - for (final RelationshipDTO rel : processorDTO.getRelationships()) { - if (rel.isAutoTerminate()) { - relationships.add(procNode.getRelationship(rel.getName())); - } - } - procNode.setAutoTerminatedRelationships(relationships); - } - - if (config.getProperties() != null) { - for (Map.Entry<String, String> entry : config.getProperties().entrySet()) { - if (entry.getValue() != null) { - procNode.setProperty(entry.getKey(), entry.getValue()); - } - } - } - - group.addProcessor(procNode); - } - - // - // Instantiate Remote Process Groups - // - for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) { - final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri()); - remoteGroup.setComments(remoteGroupDTO.getComments()); - remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition())); - remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout()); - remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration()); - remoteGroup.setProcessGroup(group); - - // set the input/output ports - if (remoteGroupDTO.getContents() != null) { - final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents(); - - // ensure there input ports - if (contents.getInputPorts() != null) { - remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts())); - } - - // ensure there are output ports - if (contents.getOutputPorts() != null) { - remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts())); - } - } - - group.addRemoteProcessGroup(remoteGroup); - } - - // - // Instantiate ProcessGroups - // - for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) { - final ProcessGroup childGroup = createProcessGroup(groupDTO.getId()); - childGroup.setParent(group); - childGroup.setPosition(toPosition(groupDTO.getPosition())); - childGroup.setComments(groupDTO.getComments()); - childGroup.setName(groupDTO.getName()); - group.addProcessGroup(childGroup); - - final FlowSnippetDTO contents = groupDTO.getContents(); - - // we want this to be recursive, so we will create a new template that contains only - // the contents of this child group and recursively call ourselves. - final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO(); - childTemplateDTO.setConnections(contents.getConnections()); - childTemplateDTO.setInputPorts(contents.getInputPorts()); - childTemplateDTO.setLabels(contents.getLabels()); - childTemplateDTO.setOutputPorts(contents.getOutputPorts()); - childTemplateDTO.setProcessGroups(contents.getProcessGroups()); - childTemplateDTO.setProcessors(contents.getProcessors()); - childTemplateDTO.setFunnels(contents.getFunnels()); - childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups()); - instantiateSnippet(childGroup, childTemplateDTO); - } - - // - // Instantiate Connections - // - for (final ConnectionDTO connectionDTO : dto.getConnections()) { - final ConnectableDTO sourceDTO = connectionDTO.getSource(); - final ConnectableDTO destinationDTO = connectionDTO.getDestination(); - final Connectable source; - final Connectable destination; - - // locate the source and destination connectable. if this is a remote port - // we need to locate the remote process groups. otherwise we need to - // find the connectable given its parent group. - // NOTE: (getConnectable returns ANY connectable, when the parent is - // not this group only input ports or output ports should be returned. if something - // other than a port is returned, an exception will be thrown when adding the - // connection below.) - // see if the source connectable is a remote port - if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) { - final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId()); - source = remoteGroup.getOutputPort(sourceDTO.getId()); - } else { - final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId()); - source = sourceGroup.getConnectable(sourceDTO.getId()); - } - - // see if the destination connectable is a remote port - if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) { - final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId()); - destination = remoteGroup.getInputPort(destinationDTO.getId()); - } else { - final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId()); - destination = destinationGroup.getConnectable(destinationDTO.getId()); - } - - // determine the selection relationships for this connection - final Set<String> relationships = new HashSet<>(); - if (connectionDTO.getSelectedRelationships() != null) { - relationships.addAll(connectionDTO.getSelectedRelationships()); - } - - final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships); - - if (connectionDTO.getBends() != null) { - final List<Position> bendPoints = new ArrayList<>(); - for (final PositionDTO bend : connectionDTO.getBends()) { - bendPoints.add(new Position(bend.getX(), bend.getY())); - } - connection.setBendPoints(bendPoints); - } - - final FlowFileQueue queue = connection.getFlowFileQueue(); - queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold()); - queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold()); - queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration()); - - final List<String> prioritizers = connectionDTO.getPrioritizers(); - if (prioritizers != null) { - final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers); - final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>(); - for (final String className : newPrioritizersClasses) { - try { - newPrioritizers.add(createPrioritizer(className)); - } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e); - } - } - queue.setPriorities(newPrioritizers); - } - - connection.setProcessGroup(group); - group.addConnection(connection); - } - } finally { - writeLock.unlock(); - } - } - - /** - * Converts a set of ports into a set of remote process group ports. - * - * @param ports - * @return - */ - private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) { - Set<RemoteProcessGroupPortDescriptor> remotePorts = null; - if (ports != null) { - remotePorts = new LinkedHashSet<>(ports.size()); - for (RemoteProcessGroupPortDTO port : ports) { - final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); - descriptor.setId(port.getId()); - descriptor.setName(port.getName()); - descriptor.setComments(port.getComments()); - descriptor.setTargetRunning(port.isTargetRunning()); - descriptor.setConnected(port.isConnected()); - descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount()); - descriptor.setTransmitting(port.isTransmitting()); - descriptor.setUseCompression(port.getUseCompression()); - remotePorts.add(descriptor); - } - } - return remotePorts; - } - - /** - * Returns the parent of the specified Connectable. This only considers this - * group and any direct child sub groups. - * - * @param parentGroupId - * @return - */ - private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) { - if (areGroupsSame(group.getIdentifier(), parentGroupId)) { - return group; - } else { - return group.getProcessGroup(parentGroupId); - } - } - - /** - * <p> - * Verifies that the given DTO is valid, according to the following: - * - * <ul> - * <li>None of the ID's in any component of the DTO can be used in this - * flow.</li> - * <li>The ProcessGroup to which the template's contents will be added must - * not contain any InputPort or OutputPort with the same name as one of the - * corresponding components in the root level of the template.</li> - * <li>All Processors' classes must exist in this instance.</li> - * <li>All Flow File Prioritizers' classes must exist in this instance.</li> - * </ul> - * </p> - * - * <p> - * If any of the above statements does not hold true, an - * {@link IllegalStateException} or a - * {@link ProcessorInstantiationException} will be thrown. - * </p> - * - * @param group - * @param templateContents - */ - private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) { - // validate the names of Input Ports - for (final PortDTO port : templateContents.getInputPorts()) { - if (group.getInputPortByName(port.getName()) != null) { - throw new IllegalStateException("ProcessGroup already has an Input Port with name " + port.getName()); - } - } - - // validate the names of Output Ports - for (final PortDTO port : templateContents.getOutputPorts()) { - if (group.getOutputPortByName(port.getName()) != null) { - throw new IllegalStateException("ProcessGroup already has an Output Port with name " + port.getName()); - } - } - - // validate that all Processor Types and Prioritizer Types are valid - final List<String> processorClasses = new ArrayList<>(); - for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) { - processorClasses.add(c.getName()); - } - final List<String> prioritizerClasses = new ArrayList<>(); - for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) { - prioritizerClasses.add(c.getName()); - } - - final Set<ProcessorDTO> allProcs = new HashSet<>(); - final Set<ConnectionDTO> allConns = new HashSet<>(); - allProcs.addAll(templateContents.getProcessors()); - allConns.addAll(templateContents.getConnections()); - for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) { - allProcs.addAll(findAllProcessors(childGroup)); - allConns.addAll(findAllConnections(childGroup)); - } - - for (final ProcessorDTO proc : allProcs) { - if (!processorClasses.contains(proc.getType())) { - throw new IllegalStateException("Invalid Processor Type: " + proc.getType()); - } - } - - for (final ConnectionDTO conn : allConns) { - final List<String> prioritizers = conn.getPrioritizers(); - if (prioritizers != null) { - for (final String prioritizer : prioritizers) { - if (!prioritizerClasses.contains(prioritizer)) { - throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer); - } - } - } - } - } - - /** - * Recursively finds all ProcessorDTO's - * - * @param group - * @return - */ - private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) { - final Set<ProcessorDTO> procs = new HashSet<>(); - for (final ProcessorDTO dto : group.getContents().getProcessors()) { - procs.add(dto); - } - - for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) { - procs.addAll(findAllProcessors(childGroup)); - } - return procs; - } - - /** - * Recursively finds all ConnectionDTO's - * - * @param group - * @return - */ - private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO group) { - final Set<ConnectionDTO> conns = new HashSet<>(); - for (final ConnectionDTO dto : group.getContents().getConnections()) { - conns.add(dto); - } - - for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) { - conns.addAll(findAllConnections(childGroup)); - } - return conns; - } - - // - // Processor access - // - /** - * Indicates whether or not the two ID's point to the same ProcessGroup. If - * either id is null, will return <code>false</code. - * - * @param id1 - * @param id2 - * @return - */ - public boolean areGroupsSame(final String id1, final String id2) { - if (id1 == null || id2 == null) { - return false; - } else if (id1.equals(id2)) { - return true; - } else { - final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1); - final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2); - return (comparable1.equals(comparable2)); - } - } - - public FlowFilePrioritizer createPrioritizer(final String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException { - FlowFilePrioritizer prioritizer; - - final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); - try { - final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); - final Class<?> rawClass; - if (detectedClassLoaderForType == null) { - // try to find from the current class loader - rawClass = Class.forName(type); - } else { - // try to find from the registered classloader for that type - rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); - } - - Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); - final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class); - final Object processorObj = prioritizerClass.newInstance(); - prioritizer = prioritizerClass.cast(processorObj); - - return prioritizer; - } finally { - if (ctxClassLoader != null) { - Thread.currentThread().setContextClassLoader(ctxClassLoader); - } - } - } - - // - // InputPort access - // - public PortDTO updateInputPort(final String parentGroupId, final PortDTO dto) { - final ProcessGroup parentGroup = lookupGroup(parentGroupId); - final Port port = parentGroup.getInputPort(dto.getId()); - if (port == null) { - throw new IllegalStateException("No Input Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId); - } - - final String name = dto.getName(); - if (dto.getPosition() != null) { - port.setPosition(toPosition(dto.getPosition())); - } - - if (name != null) { - port.setName(name); - } - - return createDTO(port); - } - - private PortDTO createDTO(final Port port) { - if (port == null) { - return null; - } - - final PortDTO dto = new PortDTO(); - dto.setId(port.getIdentifier()); - dto.setPosition(new PositionDTO(port.getPosition().getX(), port.getPosition().getY())); - dto.setName(port.getName()); - dto.setParentGroupId(port.getProcessGroup().getIdentifier()); - - return dto; - } - - // - // OutputPort access - // - public PortDTO updateOutputPort(final String parentGroupId, final PortDTO dto) { - final ProcessGroup parentGroup = lookupGroup(parentGroupId); - final Port port = parentGroup.getOutputPort(dto.getId()); - if (port == null) { - throw new IllegalStateException("No Output Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId); - } - - final String name = dto.getName(); - if (name != null) { - port.setName(name); - } - - if (dto.getPosition() != null) { - port.setPosition(toPosition(dto.getPosition())); - } - - return createDTO(port); - } - - // - // Processor/Prioritizer/Filter Class Access - // - @SuppressWarnings("rawtypes") - public Set<Class> getFlowFileProcessorClasses() { - return ExtensionManager.getExtensions(Processor.class); - } - - @SuppressWarnings("rawtypes") - public Set<Class> getFlowFileComparatorClasses() { - return ExtensionManager.getExtensions(FlowFilePrioritizer.class); - } - - /** - * Returns the ProcessGroup with the given ID - * - * @param id - * @return the process group or null if not group is found - */ - private ProcessGroup lookupGroup(final String id) { - final ProcessGroup group = getGroup(id); - if (group == null) { - throw new IllegalStateException("No Group with ID " + id + " exists"); - } - return group; - } - - /** - * Returns the ProcessGroup with the given ID - * - * @param id - * @return the process group or null if not group is found - */ - public ProcessGroup getGroup(final String id) { - requireNonNull(id); - final ProcessGroup root; - readLock.lock(); - try { - root = rootGroup; - } finally { - readLock.unlock(); - } - - final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; - return (root == null) ? null : root.findProcessGroup(searchId); - } - - @Override - public ProcessGroupStatus getControllerStatus() { - return getGroupStatus(getRootGroupId()); - } - - public ProcessGroupStatus getGroupStatus(final String groupId) { - return getGroupStatus(groupId, getProcessorStats()); - } - - public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport) { - final ProcessGroup group = getGroup(groupId); - return getGroupStatus(group, statusReport); - } - - public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport) { - if (group == null) { - return null; - } - - final ProcessGroupStatus status = new ProcessGroupStatus(); - status.setId(group.getIdentifier()); - status.setName(group.getName()); - status.setCreationTimestamp(new Date().getTime()); - int activeGroupThreads = 0; - long bytesRead = 0L; - long bytesWritten = 0L; - int queuedCount = 0; - long queuedContentSize = 0L; - int flowFilesIn = 0; - long bytesIn = 0L; - int flowFilesOut = 0; - long bytesOut = 0L; - int flowFilesReceived = 0; - long bytesReceived = 0L; - int flowFilesSent = 0; - long bytesSent = 0L; - - // set status for processors - final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>(); - status.setProcessorStatus(processorStatusCollection); - for (final ProcessorNode procNode : group.getProcessors()) { - final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode); - processorStatusCollection.add(procStat); - activeGroupThreads += procStat.getActiveThreadCount(); - bytesRead += procStat.getBytesRead(); - bytesWritten += procStat.getBytesWritten(); - - flowFilesReceived += procStat.getFlowFilesReceived(); - bytesReceived += procStat.getBytesReceived(); - flowFilesSent += procStat.getFlowFilesSent(); - bytesSent += procStat.getBytesSent(); - } - - // set status for local child groups - final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>(); - status.setProcessGroupStatus(localChildGroupStatusCollection); - for (final ProcessGroup childGroup : group.getProcessGroups()) { - final ProcessGroupStatus childGroupStatus = getGroupStatus(childGroup, statusReport); - localChildGroupStatusCollection.add(childGroupStatus); - activeGroupThreads += childGroupStatus.getActiveThreadCount(); - bytesRead += childGroupStatus.getBytesRead(); - bytesWritten += childGroupStatus.getBytesWritten(); - queuedCount += childGroupStatus.getQueuedCount(); - queuedContentSize += childGroupStatus.getQueuedContentSize(); - - flowFilesReceived += childGroupStatus.getFlowFilesReceived(); - bytesReceived += childGroupStatus.getBytesReceived(); - flowFilesSent += childGroupStatus.getFlowFilesSent(); - bytesSent += childGroupStatus.getBytesSent(); - } - - // set status for remote child groups - final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatusCollection = new ArrayList<>(); - status.setRemoteProcessGroupStatus(remoteProcessGroupStatusCollection); - for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { - final RemoteProcessGroupStatus remoteStatus = createRemoteGroupStatus(remoteGroup, statusReport); - if (remoteStatus != null) { - remoteProcessGroupStatusCollection.add(remoteStatus); - - flowFilesReceived += remoteStatus.getReceivedCount(); - bytesReceived += remoteStatus.getReceivedContentSize(); - flowFilesSent += remoteStatus.getSentCo
<TRUNCATED>