http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java new file mode 100644 index 0000000..a0a07f2 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -0,0 +1,3534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.admin.service.UserService; +import org.apache.nifi.cluster.BulletinsPayload; +import org.apache.nifi.cluster.HeartbeatPayload; +import org.apache.nifi.cluster.protocol.DataFlow; +import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.NodeBulletins; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.NodeProtocolSender; +import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.connectable.LocalPort; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.connectable.Size; +import org.apache.nifi.connectable.StandardConnection; +import org.apache.nifi.controller.exception.CommunicationsException; +import org.apache.nifi.controller.exception.ProcessorInstantiationException; +import org.apache.nifi.controller.exception.ProcessorLifeCycleException; +import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.label.StandardLabel; +import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; +import org.apache.nifi.controller.reporting.StandardReportingTaskNode; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.CounterRepository; +import org.apache.nifi.controller.repository.FlowFileEvent; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.QueueProvider; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.RepositoryStatusReport; +import org.apache.nifi.controller.repository.StandardCounterRepository; +import org.apache.nifi.controller.repository.StandardFlowFileRecord; +import org.apache.nifi.controller.repository.StandardRepositoryRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.controller.repository.claim.ContentDirection; +import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.controller.repository.io.LimitedInputStream; +import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; +import org.apache.nifi.controller.scheduling.ProcessContextFactory; +import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; +import org.apache.nifi.controller.scheduling.StandardProcessScheduler; +import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardControllerServiceProvider; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.RunStatus; +import org.apache.nifi.controller.status.TransmissionStatus; +import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.StatusHistoryUtil; +import org.apache.nifi.controller.tasks.ExpireFlowFiles; +import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.diagnostics.SystemDiagnosticsFactory; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.events.NodeBulletinProcessingStrategy; +import org.apache.nifi.events.VolatileBulletinRepository; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.framework.security.util.SslContextFactory; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; +import org.apache.nifi.groups.StandardProcessGroup; +import org.apache.nifi.io.StreamUtils; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.logging.LogRepository; +import org.apache.nifi.logging.LogRepositoryFactory; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.logging.ProcessorLogObserver; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.nar.NarThreadContextClassLoader; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.processor.StandardProcessorInitializationContext; +import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.processor.annotation.OnAdded; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.RemoteResourceManager; +import org.apache.nifi.remote.RemoteSiteListener; +import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.remote.SocketRemoteSiteListener; +import org.apache.nifi.remote.StandardRemoteProcessGroup; +import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; +import org.apache.nifi.remote.StandardRootGroupPort; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingTask; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.web.api.dto.ConnectableDTO; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.FunnelDTO; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.ProcessGroupDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RelationshipDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.TemplateDTO; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.jersey.api.client.ClientHandlerException; + +public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider { + + // default repository implementations + public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; + public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository"; + public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository"; + public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager"; + public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"; + + public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds"; + public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds"; + public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10; + public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures + + public static final String ROOT_GROUP_ID_ALIAS = "root"; + public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow"; + + private final AtomicInteger maxTimerDrivenThreads; + private final AtomicInteger maxEventDrivenThreads; + private final AtomicReference<FlowEngine> timerDrivenEngineRef; + private final AtomicReference<FlowEngine> eventDrivenEngineRef; + + private final ContentRepository contentRepository; + private final FlowFileRepository flowFileRepository; + private final FlowFileEventRepository flowFileEventRepository; + private final ProvenanceEventRepository provenanceEventRepository; + private final VolatileBulletinRepository bulletinRepository; + private final StandardProcessScheduler processScheduler; + private final TemplateManager templateManager; + private final SnippetManager snippetManager; + private final long gracefulShutdownSeconds; + private final ExtensionManager extensionManager; + private final NiFiProperties properties; + private final SSLContext sslContext; + private final RemoteSiteListener externalSiteListener; + private final AtomicReference<CounterRepository> counterRepositoryRef; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final ControllerServiceProvider controllerServiceProvider; + private final UserService userService; + private final EventDrivenWorkerQueue eventDrivenWorkerQueue; + private final ComponentStatusRepository componentStatusRepository; + private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started + private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); + + // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may + // change while the instance is running. We do this because we want to generate heartbeats even if we + // are unable to obtain a read lock on the entire FlowController. + private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new AtomicReference<>(); + private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false); + + private final Integer remoteInputSocketPort; + private final Boolean isSiteToSiteSecure; + private Integer clusterManagerRemoteSitePort = null; + private Boolean clusterManagerRemoteSiteCommsSecure = null; + + private ProcessGroup rootGroup; + private final List<Connectable> startConnectablesAfterInitialization; + private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization; + + /** + * true if controller is configured to operate in a clustered environment + */ + private final boolean configuredForClustering; + + /** + * the time to wait between heartbeats + */ + private final int heartbeatDelaySeconds; + + /** + * The sensitive property string encryptor * + */ + private final StringEncryptor encryptor; + + /** + * cluster protocol sender + */ + private final NodeProtocolSender protocolSender; + + private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks"); + private final ContentClaimManager contentClaimManager = new StandardContentClaimManager(); + + // guarded by rwLock + /** + * timer to periodically send heartbeats to the cluster + */ + private ScheduledFuture<?> bulletinFuture; + private ScheduledFuture<?> heartbeatGeneratorFuture; + private ScheduledFuture<?> heartbeatSenderFuture; + + // guarded by FlowController lock + /** + * timer task to generate heartbeats + */ + private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null); + + private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber; + + // guarded by rwLock + /** + * the node identifier; + */ + private NodeIdentifier nodeId; + + // guarded by rwLock + /** + * true if controller is connected or trying to connect to the cluster + */ + private boolean clustered; + private String clusterManagerDN; + + // guarded by rwLock + /** + * true if controller is the primary of the cluster + */ + private boolean primary; + + // guarded by rwLock + /** + * true if connected to a cluster + */ + private boolean connected; + + // guarded by rwLock + private String instanceId; + + private volatile boolean shutdown = false; + + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private FlowFileSwapManager flowFileSwapManager; // guarded by read/write lock + + private static final Logger LOG = LoggerFactory.getLogger(FlowController.class); + private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"); + + public static FlowController createStandaloneInstance( + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final UserService userService, + final StringEncryptor encryptor) { + return new FlowController( + flowFileEventRepo, + properties, + userService, + encryptor, + /* configuredForClustering */ false, + /* NodeProtocolSender */ null); + } + + public static FlowController createClusteredInstance( + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final UserService userService, + final StringEncryptor encryptor, + final NodeProtocolSender protocolSender) { + final FlowController flowController = new FlowController( + flowFileEventRepo, + properties, + userService, + encryptor, + /* configuredForClustering */ true, + /* NodeProtocolSender */ protocolSender); + + flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure()); + + return flowController; + } + + private FlowController( + final FlowFileEventRepository flowFileEventRepo, + final NiFiProperties properties, + final UserService userService, + final StringEncryptor encryptor, + final boolean configuredForClustering, + final NodeProtocolSender protocolSender) { + + maxTimerDrivenThreads = new AtomicInteger(10); + maxEventDrivenThreads = new AtomicInteger(5); + + this.encryptor = encryptor; + this.properties = properties; + sslContext = SslContextFactory.createSslContext(properties, false); + extensionManager = new ExtensionManager(); + controllerServiceProvider = new StandardControllerServiceProvider(); + + timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); + eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); + + final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager); + flowFileRepository = flowFileRepo; + flowFileEventRepository = flowFileEventRepo; + counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); + + bulletinRepository = new VolatileBulletinRepository(); + nodeBulletinSubscriber = new AtomicReference<>(); + + try { + this.provenanceEventRepository = createProvenanceRepository(properties); + this.provenanceEventRepository.initialize(new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); + bulletinRepository.addBulletin(bulletin); + } + }); + + this.contentRepository = createContentRepository(properties); + } catch (final Exception e) { + throw new RuntimeException("Unable to create Provenance Repository", e); + } + + processScheduler = new StandardProcessScheduler(this, this, encryptor); + eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); + + final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository); + processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( + eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); + + final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); + final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); + processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); + processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); + processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); + processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); + + startConnectablesAfterInitialization = new ArrayList<>(); + startRemoteGroupPortsAfterInitialization = new ArrayList<>(); + this.userService = userService; + + final String gracefulShutdownSecondsVal = properties.getProperty(GRACEFUL_SHUTDOWN_PERIOD); + long shutdownSecs; + try { + shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal); + if (shutdownSecs < 1) { + shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS; + } + } catch (final NumberFormatException nfe) { + shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS; + } + gracefulShutdownSeconds = shutdownSecs; + + remoteInputSocketPort = properties.getRemoteInputPort(); + isSiteToSiteSecure = properties.isSiteToSiteSecure(); + + if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) { + throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured"); + } + + this.configuredForClustering = configuredForClustering; + this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS); + this.protocolSender = protocolSender; + try { + this.templateManager = new TemplateManager(properties.getTemplateDirectory()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + this.snippetManager = new SnippetManager(); + + rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor); + rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); + instanceId = UUID.randomUUID().toString(); + + if (Boolean.TRUE.equals(isSiteToSiteSecure) && sslContext == null) { + LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed."); + externalSiteListener = null; + } else if (remoteInputSocketPort == null) { + externalSiteListener = null; + } else { + // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol + RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class); + externalSiteListener = new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null); + externalSiteListener.setRootGroup(rootGroup); + } + + // Determine frequency for obtaining component status snapshots + final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + long snapshotMillis; + try { + snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); + } + + componentStatusRepository = createComponentStatusRepository(); + timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + componentStatusRepository.capture(getControllerStatus()); + } + }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); + + heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false)); + } + + private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ContentClaimManager contentClaimManager) { + final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION); + } + + try { + final FlowFileRepository created = NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileRepository.class); + synchronized (created) { + created.initialize(contentClaimManager); + } + return created; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static FlowFileSwapManager createSwapManager(final NiFiProperties properties) { + final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION); + if (implementationClassName == null) { + return null; + } + + try { + return NarThreadContextClassLoader.createInstance(implementationClassName, FlowFileSwapManager.class); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public void initializeFlow() throws IOException { + writeLock.lock(); + try { + flowFileSwapManager = createSwapManager(properties); + + long maxIdFromSwapFiles = -1L; + if (flowFileSwapManager != null) { + if (flowFileRepository.isVolatile()) { + flowFileSwapManager.purge(); + } else { + maxIdFromSwapFiles = flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager); + } + } + + flowFileRepository.loadFlowFiles(this, maxIdFromSwapFiles + 1); + + // now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the + // ContentRepository to purge superfluous files + contentRepository.cleanup(); + + if (flowFileSwapManager != null) { + flowFileSwapManager.start(flowFileRepository, this, contentClaimManager); + } + + if (externalSiteListener != null) { + externalSiteListener.start(); + } + + timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + updateRemoteProcessGroups(); + } catch (final Throwable t) { + LOG.warn("Unable to update Remote Process Groups due to " + t); + if (LOG.isDebugEnabled()) { + LOG.warn("", t); + } + } + } + }, 0L, 30L, TimeUnit.SECONDS); + + initialized.set(true); + } finally { + writeLock.unlock(); + } + } + + /** + * <p> + * Causes any processors that were added to the flow with a 'delayStart' + * flag of true to now start + * </p> + */ + public void startDelayed() { + writeLock.lock(); + try { + LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); + for (final Connectable connectable : startConnectablesAfterInitialization) { + if (connectable.getScheduledState() == ScheduledState.DISABLED) { + continue; + } + + try { + if (connectable instanceof ProcessorNode) { + connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); + } else { + startConnectable(connectable); + } + } catch (final Throwable t) { + LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); + } + } + + startConnectablesAfterInitialization.clear(); + + int startedTransmitting = 0; + for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) { + try { + remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort); + startedTransmitting++; + } catch (final Throwable t) { + LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t}); + } + } + + LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting); + startRemoteGroupPortsAfterInitialization.clear(); + } finally { + writeLock.unlock(); + } + } + + private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION); + } + + try { + final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class); + synchronized (contentRepo) { + contentRepo.initialize(contentClaimManager); + } + return contentRepo; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private ProvenanceEventRepository createProvenanceRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS); + } + + try { + return NarThreadContextClassLoader.createInstance(implementationClassName, ProvenanceEventRepository.class); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private ComponentStatusRepository createComponentStatusRepository() { + final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); + if (implementationClassName == null) { + throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " + + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); + } + + try { + return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Creates a connection between two Connectable objects. + * + * @param id required ID of the connection + * @param name the name of the connection, or <code>null</code> to leave the + * connection unnamed + * @param source required source + * @param destination required destination + * @param relationshipNames required collection of relationship names + * @return + * + * @throws NullPointerException if the ID, source, destination, or set of + * relationships is null. + * @throws IllegalArgumentException if <code>relationships</code> is an + * empty collection + */ + public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) { + final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler); + + final List<Relationship> relationships = new ArrayList<>(); + for (final String relationshipName : requireNonNull(relationshipNames)) { + relationships.add(new Relationship.Builder().name(relationshipName).build()); + } + + return builder.id(requireNonNull(id).intern()).name(name == null ? null : name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build(); + } + + /** + * Creates a new Label + * + * @param id + * @param text + * @return + * @throws NullPointerException if either argument is null + */ + public Label createLabel(final String id, final String text) { + return new StandardLabel(requireNonNull(id).intern(), text); + } + + /** + * Creates a funnel + * + * @param id + * @return + */ + public Funnel createFunnel(final String id) { + return new StandardFunnel(id.intern(), null, processScheduler); + } + + /** + * Creates a Port to use as an Input Port for a Process Group + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createLocalInputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler); + } + + /** + * Creates a Port to use as an Output Port for a Process Group + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createLocalOutputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler); + } + + /** + * Creates a ProcessGroup with the given ID + * + * @param id + * @return + * @throws NullPointerException if the argument is null + */ + public ProcessGroup createProcessGroup(final String id) { + return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor); + } + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier.</p> + * + * @param type + * @param id + * @return + * @throws NullPointerException if either arg is null + * @throws ProcessorInstantiationException if the processor cannot be + * instantiated for any reason + */ + public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException { + id = id.intern(); + final Processor processor = instantiateProcessor(type, id); + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); + final ProcessorNode procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider); + + final LogRepository logRepository = LogRepositoryFactory.getRepository(id); + logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); + + // TODO: We should only call this the first time that it is added to the graph.... + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); + } catch (final Exception e) { + logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); + throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); + } + + return procNode; + } + + private Processor instantiateProcessor(final String type, final String identifier) throws ProcessorInstantiationException { + Processor processor; + + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); + final Class<?> rawClass; + if (detectedClassLoaderForType == null) { + // try to find from the current class loader + rawClass = Class.forName(type); + } else { + // try to find from the registered classloader for that type + rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + } + + Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); + final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class); + processor = processorClass.newInstance(); + final ProcessorLog processorLogger = new SimpleProcessLogger(identifier, processor); + final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this); + processor.initialize(ctx); + return processor; + } catch (final Throwable t) { + throw new ProcessorInstantiationException(type, t); + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } + + /** + * @return the ExtensionManager used for instantiating Processors, + * Prioritizers, etc. + */ + public ExtensionManager getExtensionManager() { + return extensionManager; + } + + public String getInstanceId() { + readLock.lock(); + try { + return instanceId; + } finally { + readLock.unlock(); + } + } + + /** + * Gets the BulletinRepository for storing and retrieving Bulletins. + * + * @return + */ + public BulletinRepository getBulletinRepository() { + return bulletinRepository; + } + + public SnippetManager getSnippetManager() { + return snippetManager; + } + + /** + * Creates a Port to use as an Input Port for the root Process Group, which + * is used for Site-to-Site communications + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createRemoteInputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + } + + /** + * Creates a Port to use as an Output Port for the root Process Group, which + * is used for Site-to-Site communications and will queue flow files waiting + * to be delivered to remote instances + * + * @param id + * @param name + * @return + * @throws NullPointerException if the ID or name is not unique + * @throws IllegalStateException if an Input Port already exists with the + * same name or id. + */ + public Port createRemoteOutputPort(String id, String name) { + id = requireNonNull(id).intern(); + name = requireNonNull(name).intern(); + verifyPortIdDoesNotExist(id); + return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT, userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure)); + } + + /** + * Creates a new Remote Process Group with the given ID that points to the + * given URI + * + * @param id + * @param uri + * @return + * + * @throws NullPointerException if either argument is null + * @throws IllegalArgumentException if <code>uri</code> is not a valid URI. + */ + public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) { + return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext); + } + + /** + * Verifies that no output port exists with the given id or name. If this + * does not hold true, throws an IllegalStateException + * + * @param id + * @throws IllegalStateException + */ + private void verifyPortIdDoesNotExist(final String id) { + Port port = rootGroup.findOutputPort(id); + if (port != null) { + throw new IllegalStateException("An Input Port already exists with ID " + id); + } + port = rootGroup.findInputPort(id); + if (port != null) { + throw new IllegalStateException("An Input Port already exists with ID " + id); + } + } + + /** + * @return the name of this controller, which is also the name of the Root + * Group. + */ + public String getName() { + readLock.lock(); + try { + return rootGroup.getName(); + } finally { + readLock.unlock(); + } + } + + /** + * Sets the name for the Root Group, which also changes the name for the + * controller. + * + * @param name + */ + public void setName(final String name) { + readLock.lock(); + try { + rootGroup.setName(name); + } finally { + readLock.unlock(); + } + } + + /** + * Gets the comments of this controller, which is also the comment of the + * Root Group. + * + * @return + */ + public String getComments() { + readLock.lock(); + try { + return rootGroup.getComments(); + } finally { + readLock.unlock(); + } + } + + /** + * Sets the comment for the Root Group, which also changes the comment for + * the controller. + * + * @param comments + */ + public void setComments(final String comments) { + readLock.lock(); + try { + rootGroup.setComments(comments); + } finally { + readLock.unlock(); + } + } + + /** + * @return <code>true</code> if the scheduling engine for this controller + * has been terminated. + */ + public boolean isTerminated() { + this.readLock.lock(); + try { + return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated()); + } finally { + this.readLock.unlock(); + } + } + + /** + * Triggers the controller to begin shutdown, stopping all processors and + * terminating the scheduling engine. After calling this method, the + * {@link #isTerminated()} method will indicate whether or not the shutdown + * has finished. + * + * @param kill if <code>true</code>, attempts to stop all active threads, + * but makes no guarantee that this will happen + * + * @throws IllegalStateException if the controller is already stopped or + * currently in the processor of stopping + */ + public void shutdown(final boolean kill) { + this.shutdown = true; + stopAllProcessors(); + + writeLock.lock(); + try { + if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) { + throw new IllegalStateException("Controller already stopped or still stopping..."); + } + + if (kill) { + this.timerDrivenEngineRef.get().shutdownNow(); + this.eventDrivenEngineRef.get().shutdownNow(); + LOG.info("Initiated immediate shutdown of flow controller..."); + } else { + this.timerDrivenEngineRef.get().shutdown(); + this.eventDrivenEngineRef.get().shutdown(); + LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds"); + } + + clusterTaskExecutor.shutdown(); + + // Trigger any processors' methods marked with @OnShutdown to be called + rootGroup.shutdown(); + + try { + this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); + this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + LOG.info("Interrupted while waiting for controller termination."); + } + + try { + flowFileRepository.close(); + } catch (final Throwable t) { + LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t}); + } + + if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) { + LOG.info("Controller has been terminated successfully."); + } else { + LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop. Might need to kill the program manually."); + } + + if (externalSiteListener != null) { + externalSiteListener.stop(); + } + + if (flowFileSwapManager != null) { + flowFileSwapManager.shutdown(); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Serializes the current state of the controller to the given OutputStream + * + * @param serializer + * @param os + * @throws FlowSerializationException if serialization of the flow fails for + * any reason + */ + public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { + readLock.lock(); + try { + serializer.serialize(this, os); + } finally { + readLock.unlock(); + } + } + + /** + * Synchronizes this controller with the proposed flow. + * + * For more details, see + * {@link FlowSynchronizer#sync(FlowController, DataFlow)}. + * + * @param synchronizer + * @param dataFlow the flow to load the controller with. If the flow is null + * or zero length, then the controller must not have a flow or else an + * UninheritableFlowException will be thrown. + * + * @throws FlowSerializationException if proposed flow is not a valid flow + * configuration file + * @throws UninheritableFlowException if the proposed flow cannot be loaded + * by the controller because in doing so would risk orphaning flow files + * @throws FlowSynchronizationException if updates to the controller failed. + * If this exception is thrown, then the controller should be considered + * unsafe to be used + */ + public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow) + throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + writeLock.lock(); + try { + LOG.debug("Synchronizing controller with proposed flow"); + synchronizer.sync(this, dataFlow, encryptor); + LOG.info("Successfully synchronized controller with proposed flow"); + } finally { + writeLock.unlock(); + } + } + + /** + * @return the currently configured maximum number of threads that can be + * used for executing processors at any given time. + */ + public int getMaxTimerDrivenThreadCount() { + return maxTimerDrivenThreads.get(); + } + + public int getMaxEventDrivenThreadCount() { + return maxEventDrivenThreads.get(); + } + + public void setMaxTimerDrivenThreadCount(final int maxThreadCount) { + writeLock.lock(); + try { + setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads); + } finally { + writeLock.unlock(); + } + } + + public void setMaxEventDrivenThreadCount(final int maxThreadCount) { + writeLock.lock(); + try { + setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads); + processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount); + } finally { + writeLock.unlock(); + } + } + + /** + * Updates the number of threads that can be simultaneously used for + * executing processors. + * + * @param maxThreadCount + * + * This method must be called while holding the write lock! + */ + private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) { + if (maxThreadCount < 1) { + throw new IllegalArgumentException(); + } + + maxThreads.getAndSet(maxThreadCount); + if (null != engine && engine.getCorePoolSize() < maxThreadCount) { + engine.setCorePoolSize(maxThreads.intValue()); + } + } + + /** + * @return the ID of the root group + */ + public String getRootGroupId() { + readLock.lock(); + try { + return rootGroup.getIdentifier(); + } finally { + readLock.unlock(); + } + } + + /** + * Sets the root group to the given group + * + * @param group the ProcessGroup that is to become the new Root Group + * + * @throws IllegalArgumentException if the ProcessGroup has a parent + * @throws IllegalStateException if the FlowController does not know about + * the given process group + */ + void setRootGroup(final ProcessGroup group) { + if (requireNonNull(group).getParent() != null) { + throw new IllegalArgumentException("A ProcessGroup that has a parent cannot be the Root Group"); + } + + writeLock.lock(); + try { + rootGroup = group; + + if (externalSiteListener != null) { + externalSiteListener.setRootGroup(group); + } + + // update the heartbeat bean + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected)); + } finally { + writeLock.unlock(); + } + } + + public SystemDiagnostics getSystemDiagnostics() { + final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory(); + return factory.create(flowFileRepository, contentRepository); + } + + // + // ProcessGroup access + // + /** + * Updates the process group corresponding to the specified DTO. Any field + * in DTO that is <code>null</code> (with the exception of the required ID) + * will be ignored. + * + * @param dto + * @return a fully-populated DTO representing the newly updated ProcessGroup + * @throws ProcessorInstantiationException + * + * @throws IllegalStateException if no process group can be found with the + * ID of DTO or with the ID of the DTO's parentGroupId, if the template ID + * specified is invalid, or if the DTO's Parent Group ID changes but the + * parent group has incoming or outgoing connections + * + * @throws NullPointerException if the DTO or its ID is null + */ + public void updateProcessGroup(final ProcessGroupDTO dto) throws ProcessorInstantiationException { + final ProcessGroup group = lookupGroup(requireNonNull(dto).getId()); + + final String name = dto.getName(); + final PositionDTO position = dto.getPosition(); + final String comments = dto.getComments(); + + if (name != null) { + group.setName(name); + } + if (position != null) { + group.setPosition(toPosition(position)); + } + if (comments != null) { + group.setComments(comments); + } + } + + // + // Template access + // + /** + * Adds a template to this controller. The contents of this template must be + * part of the current flow. This is going create a template based on a + * snippet of this flow. + * + * @param dto + * @return a copy of the given DTO + * @throws IOException if an I/O error occurs when persisting the Template + * @throws NullPointerException if the DTO is null + * @throws IllegalArgumentException if does not contain all required + * information, such as the template name or a processor's configuration + * element + */ + public Template addTemplate(final TemplateDTO dto) throws IOException { + return templateManager.addTemplate(dto); + } + + /** + * Removes all templates from this controller + * + * @throws IOException + */ + public void clearTemplates() throws IOException { + templateManager.clear(); + } + + /** + * Imports the specified template into this controller. The contents of this + * template may have come from another NiFi instance. + * + * @param dto + * @return + * @throws IOException + */ + public Template importTemplate(final TemplateDTO dto) throws IOException { + return templateManager.importTemplate(dto); + } + + /** + * Returns the template with the given ID, or <code>null</code> if no + * template exists with the given ID. + * + * @param id + * @return + */ + public Template getTemplate(final String id) { + return templateManager.getTemplate(id); + } + + public TemplateManager getTemplateManager() { + return templateManager; + } + + /** + * Returns all templates that this controller knows about. + * + * @return + */ + public Collection<Template> getTemplates() { + return templateManager.getTemplates(); + } + + /** + * Removes the template with the given ID. + * + * @param id the ID of the template to remove + * @throws NullPointerException if the argument is null + * @throws IllegalStateException if no template exists with the given ID + * @throws IOException if template could not be removed + */ + public void removeTemplate(final String id) throws IOException, IllegalStateException { + templateManager.removeTemplate(id); + } + + private Position toPosition(final PositionDTO dto) { + return new Position(dto.getX(), dto.getY()); + } + + // + // Snippet + // + /** + * Creates an instance of the given snippet and adds the components to the + * given group + * + * @param group + * @param dto + * + * @throws NullPointerException if either argument is null + * @throws IllegalStateException if the snippet is not valid because a + * component in the snippet has an ID that is not unique to this flow, or + * because it shares an Input Port or Output Port at the root level whose + * name already exists in the given ProcessGroup, or because the Template + * contains a Processor or a Prioritizer whose class is not valid within + * this instance of NiFi. + * @throws ProcessorInstantiationException if unable to instantiate a + * processor + */ + public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { + writeLock.lock(); + try { + validateSnippetContents(requireNonNull(group), dto); + + // + // Instantiate the labels + // + for (final LabelDTO labelDTO : dto.getLabels()) { + final Label label = createLabel(labelDTO.getId(), labelDTO.getLabel()); + label.setPosition(toPosition(labelDTO.getPosition())); + if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) { + label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); + } + + // TODO: Update the label's "style" + group.addLabel(label); + } + + // + // Instantiate the funnels + for (final FunnelDTO funnelDTO : dto.getFunnels()) { + final Funnel funnel = createFunnel(funnelDTO.getId()); + funnel.setPosition(toPosition(funnelDTO.getPosition())); + group.addFunnel(funnel); + } + + // + // Instantiate Input Ports & Output Ports + // + for (final PortDTO portDTO : dto.getInputPorts()) { + final Port inputPort; + if (group.isRootGroup()) { + inputPort = createRemoteInputPort(portDTO.getId(), portDTO.getName()); + inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); + if (portDTO.getGroupAccessControl() != null) { + ((RootGroupPort) inputPort).setGroupAccessControl(portDTO.getGroupAccessControl()); + } + if (portDTO.getUserAccessControl() != null) { + ((RootGroupPort) inputPort).setUserAccessControl(portDTO.getUserAccessControl()); + } + } else { + inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName()); + } + + inputPort.setPosition(toPosition(portDTO.getPosition())); + inputPort.setProcessGroup(group); + inputPort.setComments(portDTO.getComments()); + group.addInputPort(inputPort); + } + + for (final PortDTO portDTO : dto.getOutputPorts()) { + final Port outputPort; + if (group.isRootGroup()) { + outputPort = createRemoteOutputPort(portDTO.getId(), portDTO.getName()); + outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount()); + if (portDTO.getGroupAccessControl() != null) { + ((RootGroupPort) outputPort).setGroupAccessControl(portDTO.getGroupAccessControl()); + } + if (portDTO.getUserAccessControl() != null) { + ((RootGroupPort) outputPort).setUserAccessControl(portDTO.getUserAccessControl()); + } + } else { + outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName()); + } + + outputPort.setPosition(toPosition(portDTO.getPosition())); + outputPort.setProcessGroup(group); + outputPort.setComments(portDTO.getComments()); + group.addOutputPort(outputPort); + } + + // + // Instantiate the processors + // + for (final ProcessorDTO processorDTO : dto.getProcessors()) { + final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId()); + + procNode.setPosition(toPosition(processorDTO.getPosition())); + procNode.setProcessGroup(group); + + final ProcessorConfigDTO config = processorDTO.getConfig(); + procNode.setComments(config.getComments()); + if (config.isLossTolerant() != null) { + procNode.setLossTolerant(config.isLossTolerant()); + } + procNode.setName(processorDTO.getName()); + + procNode.setYieldPeriod(config.getYieldDuration()); + procNode.setPenalizationPeriod(config.getPenaltyDuration()); + procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); + procNode.setAnnotationData(config.getAnnotationData()); + procNode.setStyle(processorDTO.getStyle()); + + if (config.getRunDurationMillis() != null) { + procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS); + } + + if (config.getSchedulingStrategy() != null) { + procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); + } + + // ensure that the scheduling strategy is set prior to these values + procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); + procNode.setScheduldingPeriod(config.getSchedulingPeriod()); + + final Set<Relationship> relationships = new HashSet<>(); + if (processorDTO.getRelationships() != null) { + for (final RelationshipDTO rel : processorDTO.getRelationships()) { + if (rel.isAutoTerminate()) { + relationships.add(procNode.getRelationship(rel.getName())); + } + } + procNode.setAutoTerminatedRelationships(relationships); + } + + if (config.getProperties() != null) { + for (Map.Entry<String, String> entry : config.getProperties().entrySet()) { + if (entry.getValue() != null) { + procNode.setProperty(entry.getKey(), entry.getValue()); + } + } + } + + group.addProcessor(procNode); + } + + // + // Instantiate Remote Process Groups + // + for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) { + final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri()); + remoteGroup.setComments(remoteGroupDTO.getComments()); + remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition())); + remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout()); + remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration()); + remoteGroup.setProcessGroup(group); + + // set the input/output ports + if (remoteGroupDTO.getContents() != null) { + final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents(); + + // ensure there input ports + if (contents.getInputPorts() != null) { + remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts())); + } + + // ensure there are output ports + if (contents.getOutputPorts() != null) { + remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts())); + } + } + + group.addRemoteProcessGroup(remoteGroup); + } + + // + // Instantiate ProcessGroups + // + for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) { + final ProcessGroup childGroup = createProcessGroup(groupDTO.getId()); + childGroup.setParent(group); + childGroup.setPosition(toPosition(groupDTO.getPosition())); + childGroup.setComments(groupDTO.getComments()); + childGroup.setName(groupDTO.getName()); + group.addProcessGroup(childGroup); + + final FlowSnippetDTO contents = groupDTO.getContents(); + + // we want this to be recursive, so we will create a new template that contains only + // the contents of this child group and recursively call ourselves. + final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO(); + childTemplateDTO.setConnections(contents.getConnections()); + childTemplateDTO.setInputPorts(contents.getInputPorts()); + childTemplateDTO.setLabels(contents.getLabels()); + childTemplateDTO.setOutputPorts(contents.getOutputPorts()); + childTemplateDTO.setProcessGroups(contents.getProcessGroups()); + childTemplateDTO.setProcessors(contents.getProcessors()); + childTemplateDTO.setFunnels(contents.getFunnels()); + childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups()); + instantiateSnippet(childGroup, childTemplateDTO); + } + + // + // Instantiate Connections + // + for (final ConnectionDTO connectionDTO : dto.getConnections()) { + final ConnectableDTO sourceDTO = connectionDTO.getSource(); + final ConnectableDTO destinationDTO = connectionDTO.getDestination(); + final Connectable source; + final Connectable destination; + + // locate the source and destination connectable. if this is a remote port + // we need to locate the remote process groups. otherwise we need to + // find the connectable given its parent group. + // NOTE: (getConnectable returns ANY connectable, when the parent is + // not this group only input ports or output ports should be returned. if something + // other than a port is returned, an exception will be thrown when adding the + // connection below.) + // see if the source connectable is a remote port + if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) { + final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId()); + source = remoteGroup.getOutputPort(sourceDTO.getId()); + } else { + final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId()); + source = sourceGroup.getConnectable(sourceDTO.getId()); + } + + // see if the destination connectable is a remote port + if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) { + final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId()); + destination = remoteGroup.getInputPort(destinationDTO.getId()); + } else { + final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId()); + destination = destinationGroup.getConnectable(destinationDTO.getId()); + } + + // determine the selection relationships for this connection + final Set<String> relationships = new HashSet<>(); + if (connectionDTO.getSelectedRelationships() != null) { + relationships.addAll(connectionDTO.getSelectedRelationships()); + } + + final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships); + + if (connectionDTO.getBends() != null) { + final List<Position> bendPoints = new ArrayList<>(); + for (final PositionDTO bend : connectionDTO.getBends()) { + bendPoints.add(new Position(bend.getX(), bend.getY())); + } + connection.setBendPoints(bendPoints); + } + + final FlowFileQueue queue = connection.getFlowFileQueue(); + queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold()); + queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold()); + queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration()); + + final List<String> prioritizers = connectionDTO.getPrioritizers(); + if (prioritizers != null) { + final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers); + final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>(); + for (final String className : newPrioritizersClasses) { + try { + newPrioritizers.add(createPrioritizer(className)); + } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e); + } + } + queue.setPriorities(newPrioritizers); + } + + connection.setProcessGroup(group); + group.addConnection(connection); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Converts a set of ports into a set of remote process group ports. + * + * @param ports + * @return + */ + private Set<RemoteProcessGroupPortDescriptor> convertRemotePort(final Set<RemoteProcessGroupPortDTO> ports) { + Set<RemoteProcessGroupPortDescriptor> remotePorts = null; + if (ports != null) { + remotePorts = new LinkedHashSet<>(ports.size()); + for (RemoteProcessGroupPortDTO port : ports) { + final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); + descriptor.setId(port.getId()); + descriptor.setName(port.getName()); + descriptor.setComments(port.getComments()); + descriptor.setTargetRunning(port.isTargetRunning()); + descriptor.setConnected(port.isConnected()); + descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount()); + descriptor.setTransmitting(port.isTransmitting()); + descriptor.setUseCompression(port.getUseCompression()); + remotePorts.add(descriptor); + } + } + return remotePorts; + } + + /** + * Returns the parent of the specified Connectable. This only considers this + * group and any direct child sub groups. + * + * @param parentGroupId + * @return + */ + private ProcessGroup getConnectableParent(final ProcessGroup group, final String parentGroupId) { + if (areGroupsSame(group.getIdentifier(), parentGroupId)) { + return group; + } else { + return group.getProcessGroup(parentGroupId); + } + } + + /** + * <p> + * Verifies that the given DTO is valid, according to the following: + * + * <ul> + * <li>None of the ID's in any component of the DTO can be used in this + * flow.</li> + * <li>The ProcessGroup to which the template's contents will be added must + * not contain any InputPort or OutputPort with the same name as one of the + * corresponding components in the root level of the template.</li> + * <li>All Processors' classes must exist in this instance.</li> + * <li>All Flow File Prioritizers' classes must exist in this instance.</li> + * </ul> + * </p> + * + * <p> + * If any of the above statements does not hold true, an + * {@link IllegalStateException} or a + * {@link ProcessorInstantiationException} will be thrown. + * </p> + * + * @param group + * @param templateContents + */ + private void validateSnippetContents(final ProcessGroup group, final FlowSnippetDTO templateContents) { + // validate the names of Input Ports + for (final PortDTO port : templateContents.getInputPorts()) { + if (group.getInputPortByName(port.getName()) != null) { + throw new IllegalStateException("ProcessGroup already has an Input Port with name " + port.getName()); + } + } + + // validate the names of Output Ports + for (final PortDTO port : templateContents.getOutputPorts()) { + if (group.getOutputPortByName(port.getName()) != null) { + throw new IllegalStateException("ProcessGroup already has an Output Port with name " + port.getName()); + } + } + + // validate that all Processor Types and Prioritizer Types are valid + final List<String> processorClasses = new ArrayList<>(); + for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) { + processorClasses.add(c.getName()); + } + final List<String> prioritizerClasses = new ArrayList<>(); + for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) { + prioritizerClasses.add(c.getName()); + } + + final Set<ProcessorDTO> allProcs = new HashSet<>(); + final Set<ConnectionDTO> allConns = new HashSet<>(); + allProcs.addAll(templateContents.getProcessors()); + allConns.addAll(templateContents.getConnections()); + for (final ProcessGroupDTO childGroup : templateContents.getProcessGroups()) { + allProcs.addAll(findAllProcessors(childGroup)); + allConns.addAll(findAllConnections(childGroup)); + } + + for (final ProcessorDTO proc : allProcs) { + if (!processorClasses.contains(proc.getType())) { + throw new IllegalStateException("Invalid Processor Type: " + proc.getType()); + } + } + + for (final ConnectionDTO conn : allConns) { + final List<String> prioritizers = conn.getPrioritizers(); + if (prioritizers != null) { + for (final String prioritizer : prioritizers) { + if (!prioritizerClasses.contains(prioritizer)) { + throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer); + } + } + } + } + } + + /** + * Recursively finds all ProcessorDTO's + * + * @param group + * @return + */ + private Set<ProcessorDTO> findAllProcessors(final ProcessGroupDTO group) { + final Set<ProcessorDTO> procs = new HashSet<>(); + for (final ProcessorDTO dto : group.getContents().getProcessors()) { + procs.add(dto); + } + + for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) { + procs.addAll(findAllProcessors(childGroup)); + } + return procs; + } + + /** + * Recursively finds all ConnectionDTO's + * + * @param group + * @return + */ + private Set<ConnectionDTO> findAllConnections(final ProcessGroupDTO group) { + final Set<ConnectionDTO> conns = new HashSet<>(); + for (final ConnectionDTO dto : group.getContents().getConnections()) { + conns.add(dto); + } + + for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) { + conns.addAll(findAllConnections(childGroup)); + } + return conns; + } + + // + // Processor access + // + /** + * Indicates whether or not the two ID's point to the same ProcessGroup. If + * either id is null, will return <code>false</code. + * + * @param id1 + * @param id2 + * @return + */ + public boolean areGroupsSame(final String id1, final String id2) { + if (id1 == null || id2 == null) { + return false; + } else if (id1.equals(id2)) { + return true; + } else { + final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1); + final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2); + return (comparable1.equals(comparable2)); + } + } + + public FlowFilePrioritizer createPrioritizer(final String type) throws InstantiationException, IllegalAccessException, ClassNotFoundException { + FlowFilePrioritizer prioritizer; + + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); + final Class<?> rawClass; + if (detectedClassLoaderForType == null) { + // try to find from the current class loader + rawClass = Class.forName(type); + } else { + // try to find from the registered classloader for that type + rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + } + + Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); + final Class<? extends FlowFilePrioritizer> prioritizerClass = rawClass.asSubclass(FlowFilePrioritizer.class); + final Object processorObj = prioritizerClass.newInstance(); + prioritizer = prioritizerClass.cast(processorObj); + + return prioritizer; + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } + + // + // InputPort access + // + public PortDTO updateInputPort(final String parentGroupId, final PortDTO dto) { + final ProcessGroup parentGroup = lookupGroup(parentGroupId); + final Port port = parentGroup.getInputPort(dto.getId()); + if (port == null) { + throw new IllegalStateException("No Input Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId); + } + + final String name = dto.getName(); + if (dto.getPosition() != null) { + port.setPosition(toPosition(dto.getPosition())); + } + + if (name != null) { + port.setName(name); + } + + return createDTO(port); + } + + private PortDTO createDTO(final Port port) { + if (port == null) { + return null; + } + + final PortDTO dto = new PortDTO(); + dto.setId(port.getIdentifier()); + dto.setPosition(new PositionDTO(port.getPosition().getX(), port.getPosition().getY())); + dto.setName(port.getName()); + dto.setParentGroupId(port.getProcessGroup().getIdentifier()); + + return dto; + } + + // + // OutputPort access + // + public PortDTO updateOutputPort(final String parentGroupId, final PortDTO dto) { + final ProcessGroup parentGroup = lookupGroup(parentGroupId); + final Port port = parentGroup.getOutputPort(dto.getId()); + if (port == null) { + throw new IllegalStateException("No Output Port with ID " + dto.getId() + " is known as a child of ProcessGroup with ID " + parentGroupId); + } + + final String name = dto.getName(); + if (name != null) { + port.setName(name); + } + + if (dto.getPosition() != null) { + port.setPosition(toPosition(dto.getPosition())); + } + + return createDTO(port); + } + + // + // Processor/Prioritizer/Filter Class Access + // + @SuppressWarnings("rawtypes") + public Set<Class> getFlowFileProcessorClasses() { + return ExtensionManager.getExtensions(Processor.class); + } + + @SuppressWarnings("rawtypes") + public Set<Class> getFlowFileComparatorClasses() { + return ExtensionManager.getExtensions(FlowFilePrioritizer.class); + } + + /** + * Returns the ProcessGroup with the given ID + * + * @param id + * @return the process group or null if not group is found + */ + private ProcessGroup lookupGroup(final String id) { + final ProcessGroup group = getGroup(id); + if (group == null) { + throw new IllegalStateException("No Group with ID " + id + " exists"); + } + return group; + } + + /** + * Returns the ProcessGroup with the given ID + * + * @param id + * @return the process group or null if not group is found + */ + public ProcessGroup getGroup(final String id) { + requireNonNull(id); + final ProcessGroup root; + readLock.lock(); + try { + root = rootGroup; + } finally { + readLock.unlock(); + } + + final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; + return (root == null) ? null : root.findProcessGroup(searchId); + } + + @Override + public ProcessGroupStatus getControllerStatus() { + return getGroupStatus(getRootGroupId()); + } + + public ProcessGroupStatus getGroupStatus(final String groupId) { + return getGroupStatus(groupId, getProcessorStats()); + } + + public ProcessGroupStatus getGroupStatus(final String groupId, final RepositoryStatusReport statusReport) { + final ProcessGroup group = getGroup(groupId); + return getGroupStatus(group, statusReport); + } + + public ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport) { + if (group == null) { + return null; + } + + final ProcessGroupStatus status = new ProcessGroupStatus(); + status.setId(group.getIdentifier()); + status.setName(group.getName()); + status.setCreationTimestamp(new Date().getTime()); + int activeGroupThreads = 0; + long bytesRead = 0L; + long bytesWritten = 0L; + int queuedCount = 0; + long queuedContentSize = 0L; + int flowFilesIn = 0; + long bytesIn = 0L; + int flowFilesOut = 0; + long bytesOut = 0L; + int flowFilesReceived = 0; + long bytesReceived = 0L; + int flowFilesSent = 0; + long bytesSent = 0L; + + // set status for processors + final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>(); + status.setProcessorStatus(processorStatusCollection); + for (final ProcessorNode procNode : group.getProcessors()) { + final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode); + processorStatusCollection.add(procStat); + activeGroupThreads += procStat.getActiveThreadCount(); + bytesRead += procStat.getBytesRead(); + bytesWritten += procStat.getBytesWritten(); + + flowFilesReceived += procStat.getFlowFilesReceived(); + bytesReceived += procStat.getBytesReceived(); + flowFilesSent += procStat.getFlowFilesSent(); + bytesSent += procStat.getBytesSent(); + } + + // set status for local child groups + final Collection<ProcessGroupStatus> localChildGroupStatusCollection = new ArrayList<>(); + status.setProcessGroupStatus(localChildGroupStatusCollection); + for (final ProcessGroup childGroup : group.getProcessGroups()) { + final ProcessGroupStatus childGroupStatus = getGroupStatus(childGroup, statusReport); + localChildGroupStatusCollection.add(childGroupStatus); + activeGroupThreads += childGroupStatus.getActiveThreadCount(); + bytesRead += childGroupStatus.getBytesRead(); + bytesWritten += childGroupStatus.getBytesWritten(); + queuedCount += childGroupStatus.getQueuedCount(); + queuedContentSize += childGroupStatus.getQueuedContentSize(); + + flowFilesReceived += childGroupStatus.getFlowFilesReceived(); + bytesReceived += childGroupStatus.getBytesReceived(); + flowFilesSent += childGroupStatus.getFlowFilesSent(); + bytesSent += childGroupStatus.getBytesSent(); + } + + // set status for remote child groups + final Collection<RemoteProcessGroupStatus> remoteProcessGroupStatusCollection = new ArrayList<>(); + status.setRemoteProcessGroupStatus(remoteProcessGroupStatusCollection); + for (final RemoteProcessGroup remoteGroup : group.getRemoteProcessGroups()) { + final RemoteProcessGroupStatus remoteStatus = createRemoteGroupStatus(remoteGroup, statusReport); + if (remoteStatus != null) { + remoteProcessGroupStatusCollection.add(remoteStatus); + + flowFilesReceived += remoteStatus.getReceivedCount(); + bytesReceived += remoteStatus.getReceivedContentSize(); + flowFilesSent += remoteStatus.getSentCount(); + bytesSent += remoteStatus.getSentContentSize(); + } + } + + // connection status + final Collection<ConnectionStatus> connectionStatusCollection = new ArrayList<>(); + status.setConnectionStatus(connectionStatusCollection); + + // get the connection and remote port status + for (final Connection conn : group.getConnections()) { + final ConnectionStatus connStatus = new ConnectionStatus(); + connStatus.setId(conn.getIdentifier()); + connStatus.setGroupId(conn.getProcessGroup().getIdentifier()); + connStatus.setSourceId(conn.getSource().getIdentifier()); + connStatus.setSourceName(conn.getSource().getName()); + connStatus.setDestinationId(conn.getDestination().getIdentifier()); + connStatus.setDestinationName(conn.getDestination().getName()); + + final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier()); + if (connectionStatusReport != null) { + connStatus.setInputBytes(connectionStatusReport.getContentSizeIn()); + connStatus.setInputCount(connectionStatusReport.getFlowFilesIn()); + connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut()); + connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut()); + } + + if (StringUtils.isNotBlank(conn.getName())) { + connStatus.setName(conn.getName()); + } else if (conn.getRelationships() != null && !conn.getRelationships().isEmpty()) { + final Collection<String> relationships = new ArrayList<>(conn.getRelationships().size()); + for (final Relationship relationship : conn.getRelationships()) { + relationships.add(relationship.getName()); + } +
<TRUNCATED>