http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
new file mode 100644
index 0000000..3defea7
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -0,0 +1,3616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import javax.xml.validation.Validator;
+
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.cluster.BulletinsPayload;
+import org.apache.nifi.cluster.ClusterNodeInformation;
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.NodeInformation;
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextImpl;
+import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.event.EventManager;
+import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
+import org.apache.nifi.cluster.flow.ClusterDataFlow;
+import org.apache.nifi.cluster.flow.DaoException;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.manager.HttpClusterManager;
+import org.apache.nifi.cluster.manager.HttpRequestReplicator;
+import org.apache.nifi.cluster.manager.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
+import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
+import 
org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
+import 
org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
+import 
org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
+import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
+import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
+import org.apache.nifi.cluster.manager.exception.NodeReconnectionException;
+import 
org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
+import 
org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeBulletins;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import 
org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.Heartbeater;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
+import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
+import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.diagnostics.GarbageCollection;
+import org.apache.nifi.diagnostics.StorageUsage;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.framework.security.util.SslContextFactory;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.nar.NarThreadContextClassLoader;
+import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.remote.RemoteResourceManager;
+import org.apache.nifi.remote.RemoteSiteListener;
+import org.apache.nifi.remote.SocketRemoteSiteListener;
+import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.ReportingTask;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.DomUtils;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
+import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
+import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorsEntity;
+import org.apache.nifi.web.api.entity.ProvenanceEntity;
+import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.DOMException;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+import org.xml.sax.SAXParseException;
+
+import com.sun.jersey.api.client.ClientResponse;
+
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+
+/**
+ * Provides a cluster manager implementation. The manager federates incoming
+ * HTTP client requests to the nodes' external API using the HTTP protocol. The
+ * manager also communicates with nodes using the nodes' internal socket
+ * protocol.
+ *
+ * The manager's socket address may broadcasted using multicast if a
+ * MulticastServiceBroadcaster instance is set on this instance. The manager
+ * instance must be started after setting the broadcaster.
+ *
+ * The manager may be configured with an EventManager for recording noteworthy
+ * lifecycle events (e.g., first heartbeat received, node status change).
+ *
+ * The start() and stop() methods must be called to initialize and stop the
+ * instance.
+ *
+ * @author unattributed
+ */
+public class WebClusterManager implements HttpClusterManager, ProtocolHandler, 
ControllerServiceProvider {
+
+    public static final String ROOT_GROUP_ID_ALIAS = "root";
+    public static final String BULLETIN_CATEGORY = "Clustering";
+
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
+    private static final Logger heartbeatLogger = new 
NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
+
+    /**
+     * The HTTP header to store a cluster context. An example of what may be
+     * stored in the context is a node's auditable actions in response to a
+     * cluster request. The cluster context is serialized using Java's
+     * serialization mechanism and hex encoded.
+     */
+    public static final String CLUSTER_CONTEXT_HTTP_HEADER = 
"X-ClusterContext";
+
+    /**
+     * HTTP Header that stores a unique ID for each request that is replicated
+     * to the nodes. This is used for logging purposes so that request
+     * information, such as timing, can be correlated between the NCM and the
+     * nodes
+     */
+    public static final String REQUEST_ID_HEADER = "X-RequestID";
+
+    /**
+     * The HTTP header that the NCM specifies to ask a node if they are able to
+     * process a given request. The value is always 150-NodeContinue. The node
+     * will respond with 150 CONTINUE if it is able to process the request, 417
+     * EXPECTATION_FAILED otherwise.
+     */
+    public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
+    public static final int NODE_CONTINUE_STATUS_CODE = 150;
+
+    /**
+     * The HTTP header that the NCM specifies to indicate that a node should
+     * invalidate the specified user group. This is done to ensure that user
+     * cache is not stale when an administrator modifies a group through the 
UI.
+     */
+    public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = 
"X-ClusterInvalidateUserGroup";
+
+    /**
+     * The HTTP header that the NCM specifies to indicate that a node should
+     * invalidate the specified user. This is done to ensure that user cache is
+     * not stale when an administrator modifies a user through the UI.
+     */
+    public static final String CLUSTER_INVALIDATE_USER_HEADER = 
"X-ClusterInvalidateUser";
+
+    /**
+     * The default number of seconds to respond to a connecting node if the
+     * manager cannot provide it with a current data flow.
+     */
+    private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
+
+    public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = 
"org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
+
+    public static final Pattern PROCESSORS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
+    public static final Pattern PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+
+    public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
+    public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
+
+    public static final Pattern PROCESS_GROUP_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
+    public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
+    public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
+
+    public static final String PROVENANCE_URI = 
"/nifi-api/controller/provenance";
+    public static final Pattern PROVENANCE_QUERY_URI = 
Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
+    public static final Pattern PROVENANCE_EVENT_URI = 
Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
+
+    private final NiFiProperties properties;
+    private final HttpRequestReplicator httpRequestReplicator;
+    private final HttpResponseMapper httpResponseMapper;
+    private final DataFlowManagementService dataFlowManagementService;
+    private final ClusterManagerProtocolSenderListener senderListener;
+    private final StringEncryptor encryptor;
+    private final Queue<Heartbeat> pendingHeartbeats = new 
ConcurrentLinkedQueue<>();
+    private final ReentrantReadWriteLock resourceRWLock = new 
ReentrantReadWriteLock();
+    private final ClusterManagerLock readLock = new 
ClusterManagerLock(resourceRWLock.readLock(), "Read");
+    private final ClusterManagerLock writeLock = new 
ClusterManagerLock(resourceRWLock.writeLock(), "Write");
+
+    private final Set<Node> nodes = new HashSet<>();
+    private final Set<ReportingTaskNode> reportingTasks = new HashSet<>();
+
+    // null means the dataflow should be read from disk
+    private StandardDataFlow cachedDataFlow = null;
+    private NodeIdentifier primaryNodeId = null;
+    private Revision revision = new Revision(0L, "");
+    private Timer heartbeatMonitor;
+    private Timer heartbeatProcessor;
+    private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
+    private volatile EventManager eventManager = null;
+    private volatile ClusterNodeFirewall clusterFirewall = null;
+    private volatile AuditService auditService = null;
+    private volatile ControllerServiceProvider controllerServiceProvider = 
null;
+
+    private final RemoteSiteListener remoteSiteListener;
+    private final Integer remoteInputPort;
+    private final Boolean remoteCommsSecure;
+    private final BulletinRepository bulletinRepository;
+    private final String instanceId;
+    private final FlowEngine reportingTaskEngine;
+    private final Map<NodeIdentifier, ComponentStatusRepository> 
componentMetricsRepositoryMap = new HashMap<>();
+    private final StandardProcessScheduler processScheduler;
+    private final long componentStatusSnapshotMillis;
+
+    public WebClusterManager(final HttpRequestReplicator 
httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
+            final DataFlowManagementService dataFlowManagementService, final 
ClusterManagerProtocolSenderListener senderListener,
+            final NiFiProperties properties, final StringEncryptor encryptor) {
+
+        if (httpRequestReplicator == null) {
+            throw new IllegalArgumentException("HttpRequestReplicator may not 
be null.");
+        } else if (httpResponseMapper == null) {
+            throw new IllegalArgumentException("HttpResponseMapper may not be 
null.");
+        } else if (dataFlowManagementService == null) {
+            throw new IllegalArgumentException("DataFlowManagementService may 
not be null.");
+        } else if (senderListener == null) {
+            throw new 
IllegalArgumentException("ClusterManagerProtocolSenderListener may not be 
null.");
+        } else if (properties == null) {
+            throw new IllegalArgumentException("NiFiProperties may not be 
null.");
+        }
+
+        // Ensure that our encryptor/decryptor is properly initialized
+        this.httpRequestReplicator = httpRequestReplicator;
+        this.httpResponseMapper = httpResponseMapper;
+        this.dataFlowManagementService = dataFlowManagementService;
+        this.properties = properties;
+        this.controllerServiceProvider = new 
StandardControllerServiceProvider();
+        this.bulletinRepository = new VolatileBulletinRepository();
+        this.instanceId = UUID.randomUUID().toString();
+        this.senderListener = senderListener;
+        this.encryptor = encryptor;
+        senderListener.addHandler(this);
+        senderListener.setBulletinRepository(bulletinRepository);
+
+        final String snapshotFrequency = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, 
NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
+        long snapshotMillis;
+        try {
+            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, 
TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            snapshotMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
 TimeUnit.MILLISECONDS);
+        }
+        componentStatusSnapshotMillis = snapshotMillis;
+
+        remoteInputPort = properties.getRemoteInputPort();
+        if (remoteInputPort == null) {
+            remoteSiteListener = null;
+            remoteCommsSecure = null;
+        } else {
+            // Register the ClusterManagerServerProtocol as the appropriate 
resource for site-to-site Server Protocol
+            
RemoteResourceManager.setServerProtocolImplementation(ClusterManagerServerProtocol.RESOURCE_NAME,
 ClusterManagerServerProtocol.class);
+            remoteCommsSecure = properties.isSiteToSiteSecure();
+            if (remoteCommsSecure) {
+                final SSLContext sslContext = 
SslContextFactory.createSslContext(properties, false);
+
+                if (sslContext == null) {
+                    throw new IllegalStateException("NiFi Configured to allow 
Secure Site-to-Site communications but the Keystore/Truststore properties are 
not configured");
+                }
+
+                remoteSiteListener = new 
SocketRemoteSiteListener(remoteInputPort.intValue(), sslContext, this);
+            } else {
+                remoteSiteListener = new 
SocketRemoteSiteListener(remoteInputPort.intValue(), null, this);
+            }
+        }
+
+        reportingTaskEngine = new FlowEngine(8, "Reporting Task Thread");
+
+        processScheduler = new StandardProcessScheduler(new Heartbeater() {
+            @Override
+            public void heartbeat() {
+            }
+        }, this, encryptor);
+        processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, 
new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
+        processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 
10);
+        processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
+    }
+
+    public void start() throws IOException {
+        writeLock.lock();
+        try {
+
+            if (isRunning()) {
+                throw new IllegalStateException("Instance is already 
started.");
+            }
+
+            try {
+                // setup heartbeat monitoring
+                heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon 
*/ true);
+                heartbeatMonitor.scheduleAtFixedRate(new 
HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 
1000);
+
+                heartbeatProcessor = new Timer("Process Pending Heartbeats", 
true);
+                final int processPendingHeartbeatDelay = 1000 * Math.max(1, 
getClusterProtocolHeartbeatSeconds() / 2);
+                heartbeatProcessor.schedule(new 
ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, 
processPendingHeartbeatDelay);
+
+                // start request replication service
+                httpRequestReplicator.start();
+
+                // start protocol service
+                senderListener.start();
+
+                // start flow management service
+                dataFlowManagementService.start();
+
+                if (remoteSiteListener != null) {
+                    remoteSiteListener.start();
+                }
+
+                // load flow
+                if (dataFlowManagementService.isFlowCurrent()) {
+                    final ClusterDataFlow clusterDataFlow = 
dataFlowManagementService.loadDataFlow();
+                    cachedDataFlow = clusterDataFlow.getDataFlow();
+                    primaryNodeId = clusterDataFlow.getPrimaryNodeId();
+                } else {
+                    throw new IOException("Flow is not current.");
+                }
+
+                // start multicast broadcasting service, if configured
+                if (servicesBroadcaster != null) {
+                    servicesBroadcaster.start();
+                }
+
+                // start in safe mode
+                executeSafeModeTask();
+
+                // Load and start running Reporting Tasks
+                final File taskFile = new 
File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
+                reportingTasks.addAll(loadReportingTasks(taskFile));
+            } catch (final IOException ioe) {
+                logger.warn("Failed to initialize cluster services due to: " + 
ioe, ioe);
+                stop();
+                throw ioe;
+            }
+
+        } finally {
+            writeLock.unlock("START");
+        }
+    }
+
+    public void stop() throws IOException {
+        writeLock.lock();
+        try {
+
+            // returns true if any service is running
+            if (isRunning() == false) {
+                throw new IllegalArgumentException("Instance is already 
stopped.");
+            }
+
+            boolean encounteredException = false;
+
+            // stop the heartbeat monitoring
+            if (isHeartbeatMonitorRunning()) {
+                heartbeatMonitor.cancel();
+                heartbeatMonitor = null;
+            }
+
+            if (heartbeatProcessor != null) {
+                heartbeatProcessor.cancel();
+                heartbeatProcessor = null;
+            }
+
+            // stop the HTTP request replicator service
+            if (httpRequestReplicator.isRunning()) {
+                httpRequestReplicator.stop();
+            }
+
+            // stop the flow management service
+            if (dataFlowManagementService.isRunning()) {
+                dataFlowManagementService.stop();
+            }
+
+            if (remoteSiteListener != null) {
+                remoteSiteListener.stop();
+            }
+
+            // stop the protocol listener service
+            if (senderListener.isRunning()) {
+                try {
+                    senderListener.stop();
+                } catch (final IOException ioe) {
+                    encounteredException = true;
+                    logger.warn("Failed to shutdown protocol service due to: " 
+ ioe, ioe);
+                }
+            }
+
+            // stop the service broadcaster
+            if (isBroadcasting()) {
+                servicesBroadcaster.stop();
+            }
+
+            if (encounteredException) {
+                throw new IOException("Failed to shutdown Cluster Manager 
because one or more cluster services failed to shutdown.  Check the logs for 
details.");
+            }
+
+        } finally {
+            writeLock.unlock("STOP");
+        }
+    }
+
+    public boolean isRunning() {
+        readLock.lock();
+        try {
+            return isHeartbeatMonitorRunning()
+                    || httpRequestReplicator.isRunning()
+                    || senderListener.isRunning()
+                    || dataFlowManagementService.isRunning()
+                    || isBroadcasting();
+        } finally {
+            readLock.unlock("isRunning");
+        }
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return MessageType.CONNECTION_REQUEST == msg.getType()
+                || MessageType.HEARTBEAT == msg.getType()
+                || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
+                || MessageType.BULLETINS == msg.getType()
+                || MessageType.RECONNECTION_FAILURE == msg.getType();
+    }
+
+    @Override
+    public ProtocolMessage handle(final ProtocolMessage protocolMessage) 
throws ProtocolException {
+        switch (protocolMessage.getType()) {
+            case CONNECTION_REQUEST:
+                return handleConnectionRequest((ConnectionRequestMessage) 
protocolMessage);
+            case HEARTBEAT:
+                final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) 
protocolMessage;
+
+                final Heartbeat original = heartbeatMessage.getHeartbeat();
+                final NodeIdentifier originalNodeId = 
original.getNodeIdentifier();
+                final Heartbeat heartbeatWithDn = new 
Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), 
original.isPrimary(), original.isConnected(), original.getPayload());
+
+                handleHeartbeat(heartbeatWithDn);
+                return null;
+            case CONTROLLER_STARTUP_FAILURE:
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        
handleControllerStartupFailure((ControllerStartupFailureMessage) 
protocolMessage);
+                    }
+                }, "Handle Controller Startup Failure Message from " + 
((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start();
+                return null;
+            case RECONNECTION_FAILURE:
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        handleReconnectionFailure((ReconnectionFailureMessage) 
protocolMessage);
+                    }
+                }, "Handle Reconnection Failure Message from " + 
((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
+                return null;
+            case BULLETINS:
+                final NodeBulletinsMessage bulletinsMessage = 
(NodeBulletinsMessage) protocolMessage;
+                handleBulletins(bulletinsMessage.getBulletins());
+                return null;
+            default:
+                throw new ProtocolException("No handler defined for message 
type: " + protocolMessage.getType());
+        }
+    }
+
+    /**
+     * Services connection requests. If the data flow management service is
+     * unable to provide a current copy of the data flow, then the returned
+     * connection response will indicate the node should try later. Otherwise,
+     * the connection response will contain the the flow and the node
+     * identifier.
+     *
+     * If this instance is configured with a firewall and the request is
+     * blocked, then the response will not contain a node identifier.
+     *
+     * @param request a connection request
+     *
+     * @return a connection response
+     */
+    @Override
+    public ConnectionResponse requestConnection(final ConnectionRequest 
request) {
+        final boolean lockObtained = writeLock.tryLock(3, TimeUnit.SECONDS);
+        if (!lockObtained) {
+            // Create try-later response because we are too busy to service 
the request right now. We do not want
+            // to wait long because we want Node/NCM comms to be very 
responsive
+            final int tryAgainSeconds;
+            if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
+                tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
+            } else {
+                tryAgainSeconds = 
dataFlowManagementService.getRetrievalDelaySeconds();
+            }
+
+            // record event
+            final String msg = "Connection requested from node, but manager 
was too busy to service request.  Instructing node to try again in " + 
tryAgainSeconds + " seconds.";
+            addEvent(request.getProposedNodeIdentifier(), msg);
+            addBulletin(request.getProposedNodeIdentifier(), Severity.INFO, 
msg);
+
+            // return try later response
+            return new ConnectionResponse(tryAgainSeconds);
+        }
+
+        try {
+            // resolve the proposed node identifier to a valid node identifier
+            final NodeIdentifier resolvedNodeIdentifier = 
resolveProposedNodeIdentifier(request.getProposedNodeIdentifier());
+
+            if 
(isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
+                // if the socket address is not listed in the firewall, then 
return a null response
+                logger.info("Firewall blocked connection request from node " + 
resolvedNodeIdentifier);
+                return ConnectionResponse.createBlockedByFirewallResponse();
+            }
+
+            // get a raw reference to the node (if it doesn't exist, node will 
be null)
+            Node node = getRawNode(resolvedNodeIdentifier.getId());
+
+            // create a new node if necessary and set status to connecting
+            if (node == null) {
+                node = new Node(resolvedNodeIdentifier, Status.CONNECTING);
+                addEvent(node.getNodeId(), "Connection requested from new 
node.  Setting status to connecting.");
+                nodes.add(node);
+            } else {
+                node.setStatus(Status.CONNECTING);
+                addEvent(resolvedNodeIdentifier, "Connection requested from 
existing node.  Setting status to connecting");
+            }
+
+            // record the time of the connection request
+            node.setConnectionRequestedTimestamp(new Date().getTime());
+
+            // clear out old heartbeat info
+            node.setHeartbeat(null);
+
+            // try to obtain a current flow
+            if (dataFlowManagementService.isFlowCurrent()) {
+                // if a cached copy does not exist, load it from disk
+                if (cachedDataFlow == null) {
+                    final ClusterDataFlow clusterDataFlow = 
dataFlowManagementService.loadDataFlow();
+                    cachedDataFlow = clusterDataFlow.getDataFlow();
+                    primaryNodeId = clusterDataFlow.getPrimaryNodeId();
+                }
+
+                // determine if this node should be assigned the primary role
+                final boolean primaryRole;
+                if (primaryNodeId == null || 
primaryNodeId.logicallyEquals(node.getNodeId())) {
+                    setPrimaryNodeId(node.getNodeId());
+                    addEvent(node.getNodeId(), "Setting primary role in 
connection response.");
+                    primaryRole = true;
+                } else {
+                    primaryRole = false;
+                }
+
+                return new ConnectionResponse(node.getNodeId(), 
cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId);
+            }
+
+            /*
+             * The manager does not have a current copy of the data flow, 
+             * so it will instruct the node to try connecting at a later 
+             * time.  Meanwhile, the flow will be locked down from user 
+             * changes because the node is marked as connecting.
+             */
+
+            /*
+             * Create try-later response based on flow retrieval delay to give 
+             * the flow management service a chance to retrieve a curren flow
+             */
+            final int tryAgainSeconds;
+            if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
+                tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
+            } else {
+                tryAgainSeconds = 
dataFlowManagementService.getRetrievalDelaySeconds();
+            }
+
+            // record event
+            addEvent(node.getNodeId(), "Connection requested from node, but 
manager was unable to obtain current flow.  Instructing node to try again in " 
+ tryAgainSeconds + " seconds.");
+
+            // return try later response
+            return new ConnectionResponse(tryAgainSeconds);
+
+        } finally {
+            writeLock.unlock("requestConnection");
+        }
+    }
+
+    /**
+     * Services reconnection requests for a given node. If the node indicates
+     * reconnection failure, then the node will be set to disconnected and if
+     * the node has primary role, then the role will be revoked. Otherwise, a
+     * reconnection request will be sent to the node, initiating the connection
+     * handshake.
+     *
+     * @param nodeId a node identifier
+     *
+     * @throws UnknownNodeException if the node does not exist
+     * @throws IllegalNodeReconnectionException if the node cannot be
+     * reconnected because the node is not disconnected
+     * @throws NodeReconnectionException if the reconnection message failed to
+     * be sent or the cluster could not provide a current data flow for the
+     * reconnection request
+     */
+    @Override
+    public void requestReconnection(final String nodeId, final String userDn) 
throws UnknownNodeException, IllegalNodeReconnectionException {
+        Node node = null;
+
+        final boolean primaryRole;
+        final int tryAgainSeconds;
+
+        writeLock.lock();
+        try {
+            // check if we know about this node and that it is disconnected
+            node = getRawNode(nodeId);
+            logger.info("Request was made by {} to reconnect node {} to 
cluster", userDn, node == null ? nodeId : node);
+
+            if (node == null) {
+                throw new UnknownNodeException("Node does not exist.");
+            } else if (Status.DISCONNECTED != node.getStatus()) {
+                throw new IllegalNodeReconnectionException("Node must be 
disconnected before it can reconnect.");
+            }
+
+            // clear out old heartbeat info
+            node.setHeartbeat(null);
+
+            // get the dataflow to send with the reconnection request
+            if (!dataFlowManagementService.isFlowCurrent()) {
+                /* node remains disconnected */
+                final String msg = "Reconnection requested for node, but 
manager was unable to obtain current flow.  Setting node to disconnected.";
+                addEvent(node.getNodeId(), msg);
+                addBulletin(node, Severity.WARNING, msg);
+                throw new NodeReconnectionException("Manager was unable to 
obtain current flow to provide in reconnection request to node.  Try again in a 
few seconds.");
+            }
+
+            // if a cached copy does not exist, load it from disk
+            if (cachedDataFlow == null) {
+                final ClusterDataFlow clusterDataFlow = 
dataFlowManagementService.loadDataFlow();
+                cachedDataFlow = clusterDataFlow.getDataFlow();
+                primaryNodeId = clusterDataFlow.getPrimaryNodeId();
+            }
+
+            node.setStatus(Status.CONNECTING);
+            addEvent(node.getNodeId(), "Reconnection requested for node.  
Setting status to connecting.");
+
+            // determine if this node should be assigned the primary role
+            if (primaryNodeId == null || 
primaryNodeId.logicallyEquals(node.getNodeId())) {
+                setPrimaryNodeId(node.getNodeId());
+                addEvent(node.getNodeId(), "Setting primary role in 
reconnection request.");
+                primaryRole = true;
+            } else {
+                primaryRole = false;
+            }
+
+            if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
+                tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
+            } else {
+                tryAgainSeconds = 
dataFlowManagementService.getRetrievalDelaySeconds();
+            }
+        } catch (final UnknownNodeException | IllegalNodeReconnectionException 
| NodeReconnectionException une) {
+            throw une;
+        } catch (final Exception ex) {
+            logger.warn("Problem encountered issuing reconnection request to 
node " + node.getNodeId() + " due to: " + ex, ex);
+
+            node.setStatus(Status.DISCONNECTED);
+            final String eventMsg = "Problem encountered issuing reconnection 
request. Node will remain disconnected: " + ex;
+            addEvent(node.getNodeId(), eventMsg);
+            addBulletin(node, Severity.WARNING, eventMsg);
+
+            // Exception thrown will include node ID but event/bulletin do not 
because the node/id is passed along with the message
+            throw new NodeReconnectionException("Problem encountered issuing 
reconnection request to " + node.getNodeId() + ". Node will remain 
disconnected: " + ex, ex);
+        } finally {
+            writeLock.unlock("requestReconnection");
+        }
+
+        // Asynchronously start attempting reconnection. This is not 
completely thread-safe, as
+        // we do this by releasing the write lock and then obtaining a read 
lock for each attempt,
+        // so we suffer from the ABA problem. However, we are willing to 
accept the consequences of
+        // this situation in order to avoid holding a lock for the entire 
duration. "The consequences"
+        // are that a second thread could potentially be doing the same thing, 
issuing a reconnection request.
+        // However, this is very unlikely to happen, based on the conditions 
under which we issue a reconnection
+        // request. And if we do, the node will simply reconnect multiple 
times, which is not a big deal.
+        requestReconnectionAsynchronously(node, primaryRole, 10, 
tryAgainSeconds);
+    }
+
+    private void requestReconnectionAsynchronously(final Node node, final 
boolean primaryRole, final int reconnectionAttempts, final int retrySeconds) {
+        final Thread reconnectionThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < reconnectionAttempts; i++) {
+                    final ReconnectionRequestMessage request = new 
ReconnectionRequestMessage();
+
+                    try {
+                        readLock.lock();
+                        try {
+                            if (Status.CONNECTING != node.getStatus()) {
+                                // the node status has changed. It's no longer 
appropriate to attempt reconnection.
+                                return;
+                            }
+
+                            // create the request
+                            request.setNodeId(node.getNodeId());
+                            request.setDataFlow(cachedDataFlow);
+                            request.setPrimary(primaryRole);
+                            
request.setManagerRemoteSiteCommsSecure(remoteCommsSecure);
+                            
request.setManagerRemoteSiteListeningPort(remoteInputPort);
+                            request.setInstanceId(instanceId);
+                        } finally {
+                            readLock.unlock("Reconnect " + node.getNodeId());
+                        }
+
+                        // Issue a reconnection request to the node.
+                        senderListener.requestReconnection(request);
+
+                        
node.setConnectionRequestedTimestamp(System.currentTimeMillis());
+
+                        // successfully told node to reconnect -- we're done!
+                        return;
+                    } catch (final Exception e) {
+                        logger.warn("Problem encountered issuing reconnection 
request to node " + node.getNodeId() + " due to: " + e);
+                        if (logger.isDebugEnabled()) {
+                            logger.warn("", e);
+                        }
+
+                        addBulletin(node, Severity.WARNING, "Problem 
encountered issuing reconnection request to node " + node.getNodeId() + " due 
to: " + e);
+                    }
+
+                    try {
+                        Thread.sleep(1000L * retrySeconds);
+                    } catch (final InterruptedException ie) {
+                        break;
+                    }
+                }
+
+                // We failed to reconnect 10 times. We must now mark node as 
disconnected.
+                writeLock.lock();
+                try {
+                    if (Status.CONNECTING == node.getStatus()) {
+                        requestDisconnectionQuietly(node.getNodeId(), "Failed 
to issue Reconnection Request " + reconnectionAttempts + " times");
+                    }
+                } finally {
+                    writeLock.unlock("Mark node as Disconnected as a result of 
reconnection failure");
+                }
+            }
+        }, "Reconnect " + node.getNodeId());
+
+        reconnectionThread.start();
+    }
+
+    private List<ReportingTaskNode> loadReportingTasks(final File 
taskConfigXml) {
+        final List<ReportingTaskNode> tasks = new ArrayList<>();
+        if (taskConfigXml == null) {
+            logger.info("No controller tasks to start");
+            return tasks;
+        }
+
+        try {
+            final URL schemaUrl = 
getClass().getResource("/ReportingTaskConfiguration.xsd");
+            final Document document = parse(taskConfigXml, schemaUrl);
+
+            final NodeList tasksNodes = document.getElementsByTagName("tasks");
+            final Element tasksElement = (Element) tasksNodes.item(0);
+
+            //optional properties for all ReportingTasks
+            for (final Element taskElement : 
DomUtils.getChildElementsByTagName(tasksElement, "task")) {
+                //add global properties common to all tasks
+                Map<String, String> properties = new HashMap<>();
+
+                //get properties for the specific reporting task - id, name, 
class,
+                //and schedulingPeriod must be set
+                final String taskId = DomUtils.getChild(taskElement, 
"id").getTextContent().trim();
+                final String taskName = DomUtils.getChild(taskElement, 
"name").getTextContent().trim();
+
+                final List<Element> schedulingStrategyNodeList = 
DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy");
+                String schedulingStrategyValue = 
SchedulingStrategy.TIMER_DRIVEN.name();
+                if (schedulingStrategyNodeList.size() == 1) {
+                    final String specifiedValue = 
schedulingStrategyNodeList.get(0).getTextContent();
+
+                    try {
+                        schedulingStrategyValue = 
SchedulingStrategy.valueOf(specifiedValue).name();
+                    } catch (final Exception e) {
+                        throw new RuntimeException("Cannot start Reporting 
Task with id " + taskId + " because its Scheduling Strategy does not have a 
valid value", e);
+                    }
+                }
+
+                final SchedulingStrategy schedulingStrategy = 
SchedulingStrategy.valueOf(schedulingStrategyValue);
+                final String taskSchedulingPeriod = 
DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
+                final String taskClass = DomUtils.getChild(taskElement, 
"class").getTextContent().trim();
+
+                //optional task-specific properties
+                for (final Element optionalProperty : 
DomUtils.getChildElementsByTagName(taskElement, "property")) {
+                    final String name = optionalProperty.getAttribute("name");
+                    final String value = 
optionalProperty.getTextContent().trim();
+                    properties.put(name, value);
+                }
+
+                //set the class to be used for the configured reporting task
+                final ReportingTaskNode reportingTaskNode;
+                try {
+                    reportingTaskNode = createReportingTask(taskClass, taskId);
+                } catch (final ReportingTaskInstantiationException e) {
+                    logger.error("Unable to load reporting task {} due to {}", 
new Object[]{taskId, e});
+                    if (logger.isDebugEnabled()) {
+                        logger.error("", e);
+                    }
+                    continue;
+                }
+
+                final ReportingTask reportingTask = 
reportingTaskNode.getReportingTask();
+
+                final ReportingInitializationContext config = new 
StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, 
taskSchedulingPeriod, this);
+                reportingTask.initialize(config);
+
+                final Map<PropertyDescriptor, String> resolvedProps;
+                try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
+                    resolvedProps = new HashMap<>();
+                    for (final Map.Entry<String, String> entry : 
properties.entrySet()) {
+                        final PropertyDescriptor descriptor = 
reportingTask.getPropertyDescriptor(entry.getKey());
+                        resolvedProps.put(descriptor, entry.getValue());
+                    }
+                }
+
+                for (final Map.Entry<PropertyDescriptor, String> entry : 
resolvedProps.entrySet()) {
+                    reportingTaskNode.setProperty(entry.getKey().getName(), 
entry.getValue());
+                }
+
+                processScheduler.schedule(reportingTaskNode);
+                tasks.add(reportingTaskNode);
+            }
+        } catch (final SAXException | ParserConfigurationException | 
IOException | DOMException | NumberFormatException | InitializationException t) 
{
+            logger.error("Unable to load reporting tasks from {} due to {}", 
new Object[]{taskConfigXml, t});
+            if (logger.isDebugEnabled()) {
+                logger.error("", t);
+            }
+        }
+
+        return tasks;
+    }
+
+    private ReportingTaskNode createReportingTask(final String type, final 
String id) throws ReportingTaskInstantiationException {
+        if (type == null) {
+            throw new NullPointerException();
+        }
+        ReportingTask task = null;
+        final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            final ClassLoader detectedClassLoader = 
ExtensionManager.getClassLoader(type);
+            final Class<?> rawClass;
+            if (detectedClassLoader == null) {
+                rawClass = Class.forName(type);
+            } else {
+                rawClass = Class.forName(type, false, detectedClassLoader);
+            }
+
+            Thread.currentThread().setContextClassLoader(detectedClassLoader);
+            final Class<? extends ReportingTask> reportingTaskClass = 
rawClass.asSubclass(ReportingTask.class);
+            final Object reportingTaskObj = reportingTaskClass.newInstance();
+            task = reportingTaskClass.cast(reportingTaskObj);
+        } catch (final ClassNotFoundException | SecurityException | 
InstantiationException | IllegalAccessException | IllegalArgumentException t) {
+            throw new ReportingTaskInstantiationException(type, t);
+        } finally {
+            if (ctxClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(ctxClassLoader);
+            }
+        }
+
+        final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(this);
+        final ReportingTaskNode taskNode = new 
ClusteredReportingTaskNode(task, id, processScheduler,
+                new ClusteredEventAccess(this), bulletinRepository, 
controllerServiceProvider, validationContextFactory);
+        return taskNode;
+    }
+
+    private Document parse(final File xmlFile, final URL schemaUrl) throws 
SAXException, ParserConfigurationException, IOException {
+        final SchemaFactory schemaFactory = 
SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+        final Schema schema = schemaFactory.newSchema(schemaUrl);
+        final DocumentBuilderFactory docFactory = 
DocumentBuilderFactory.newInstance();
+        docFactory.setSchema(schema);
+        final DocumentBuilder builder = docFactory.newDocumentBuilder();
+
+        builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
+            @Override
+            public void fatalError(final SAXParseException err) throws 
SAXException {
+                logger.error("Config file line " + err.getLineNumber() + ", 
col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + 
err.getMessage());
+                if (logger.isDebugEnabled()) {
+                    logger.error("Error Stack Dump", err);
+                }
+                throw err;
+            }
+
+            @Override
+            public void error(final SAXParseException err) throws 
SAXParseException {
+                logger.error("Config file line " + err.getLineNumber() + ", 
col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + 
err.getMessage());
+                if (logger.isDebugEnabled()) {
+                    logger.error("Error Stack Dump", err);
+                }
+                throw err;
+            }
+
+            @Override
+            public void warning(final SAXParseException err) throws 
SAXParseException {
+                logger.warn(" Config file line " + err.getLineNumber() + ", 
uri " + err.getSystemId() + " : message : " + err.getMessage());
+                if (logger.isDebugEnabled()) {
+                    logger.warn("Warning stack dump", err);
+                }
+                throw err;
+            }
+        });
+
+        // build the docuemnt
+        final Document document = builder.parse(xmlFile);
+
+        // ensure schema compliance
+        final Validator validator = schema.newValidator();
+        validator.validate(new DOMSource(document));
+
+        return document;
+    }
+
+    private void addBulletin(final Node node, final Severity severity, final 
String msg) {
+        addBulletin(node.getNodeId(), severity, msg);
+    }
+
+    private void addBulletin(final NodeIdentifier nodeId, final Severity 
severity, final String msg) {
+        
bulletinRepository.addBulletin(BulletinFactory.createBulletin(BULLETIN_CATEGORY,
 severity.toString(),
+                nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + 
msg));
+    }
+
+    /**
+     * Services a disconnection request.
+     *
+     * @param nodeId a node identifier
+     * @param userDn the DN of the user requesting the disconnection
+     *
+     * @throws UnknownNodeException if the node does not exist
+     * @throws IllegalNodeDisconnectionException if the node cannot be
+     * disconnected due to the cluster's state (e.g., node is last connected
+     * node or node is primary)
+     * @throws NodeDisconnectionException if the disconnection message fails to
+     * be sent.
+     */
+    @Override
+    public void requestDisconnection(final String nodeId, final String userDn) 
throws UnknownNodeException, IllegalNodeDisconnectionException, 
NodeDisconnectionException {
+        writeLock.lock();
+        try {
+            // check that the node is known
+            final Node node = getNode(nodeId);
+            if (node == null) {
+                throw new UnknownNodeException("Node does not exist.");
+            }
+            requestDisconnection(node.getNodeId(), /* ignore last node */ 
false, "User " + userDn + " Disconnected Node");
+        } finally {
+            writeLock.unlock("requestDisconnection(String)");
+        }
+    }
+
+    /**
+     * Requests a disconnection to the node with the given node ID, but any
+     * exception thrown is suppressed.
+     *
+     * @param nodeId the node ID
+     */
+    private void requestDisconnectionQuietly(final NodeIdentifier nodeId, 
final String explanation) {
+        try {
+            requestDisconnection(nodeId, /* ignore node check */ true, 
explanation);
+        } catch (final IllegalNodeDisconnectionException | 
NodeDisconnectionException ex) { /* suppress exception */ }
+    }
+
+    /**
+     * Issues a disconnection message to the node identified by the given node
+     * ID. If the node is not known, then a UnknownNodeException is thrown. If
+     * the node cannot be disconnected due to the cluster's state and
+     * ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException 
is
+     * thrown. Otherwise, a disconnection message is issued to the node.
+     *
+     * Whether the disconnection message is successfully sent to the node, the
+     * node is marked as disconnected and if the node is the primary node, then
+     * the primary role is revoked.
+     *
+     * @param nodeId the ID of the node
+     * @param ignoreNodeChecks if false, checks will be made to ensure the
+     * cluster supports the node's disconnection (e.g., the node is not the 
last
+     * connected node in the cluster; the node is not the primary); otherwise,
+     * the request is made regardless of the cluster state
+     * @param explanation
+     *
+     * @throws IllegalNodeDisconnectionException if the node cannot be
+     * disconnected due to the cluster's state (e.g., node is last connected
+     * node or node is primary). Not thrown if ignoreNodeChecks is true.
+     * @throws NodeDisconnectionException if the disconnection message fails to
+     * be sent.
+     */
+    private void requestDisconnection(final NodeIdentifier nodeId, final 
boolean ignoreNodeChecks, final String explanation)
+            throws IllegalNodeDisconnectionException, 
NodeDisconnectionException {
+
+        writeLock.lock();
+        try {
+
+            // check that the node is known
+            final Node node = getRawNode(nodeId.getId());
+            if (node == null) {
+                if (ignoreNodeChecks) {
+                    // issue the disconnection
+                    final DisconnectMessage request = new DisconnectMessage();
+                    request.setNodeId(nodeId);
+                    request.setExplanation(explanation);
+
+                    addEvent(nodeId, "Disconnection requested due to " + 
explanation);
+                    senderListener.disconnect(request);
+                    addEvent(nodeId, "Node disconnected due to " + 
explanation);
+                    addBulletin(nodeId, Severity.INFO, "Node disconnected due 
to " + explanation);
+                    return;
+                } else {
+                    throw new UnknownNodeException("Node does not exist");
+                }
+            }
+
+            // if necessary, check that the node may be disconnected
+            if (!ignoreNodeChecks) {
+                final Set<NodeIdentifier> connectedNodes = 
getNodeIds(Status.CONNECTED);
+                // cannot disconnect the last connected node in the cluster
+                if (connectedNodes.size() == 1 && 
connectedNodes.iterator().next().equals(nodeId)) {
+                    throw new IllegalNodeDisconnectionException("Node may not 
be disconnected because it is the only connected node in the cluster.");
+                } else if (isPrimaryNode(nodeId)) {
+                    // cannot disconnect the primary node in the cluster
+                    throw new IllegalNodeDisconnectionException("Node may not 
be disconnected because it is the primary node in the cluster.");
+                }
+            }
+
+            // update status
+            node.setStatus(Status.DISCONNECTED);
+            notifyDataFlowManagementServiceOfNodeStatusChange();
+
+            // issue the disconnection
+            final DisconnectMessage request = new DisconnectMessage();
+            request.setNodeId(nodeId);
+            request.setExplanation(explanation);
+
+            addEvent(nodeId, "Disconnection requested due to " + explanation);
+            senderListener.disconnect(request);
+            addEvent(nodeId, "Node disconnected due to " + explanation);
+            addBulletin(node, Severity.INFO, "Node disconnected due to " + 
explanation);
+        } finally {
+            writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)");
+        }
+    }
+
+    /**
+     * Messages the node to have the primary role. If the messaging fails, then
+     * the node is marked as disconnected.
+     *
+     * @param nodeId the node ID to assign primary role
+     *
+     * @return true if primary role assigned; false otherwise
+     */
+    private boolean assignPrimaryRole(final NodeIdentifier nodeId) {
+        writeLock.lock();
+        try {
+            // create primary role message
+            final PrimaryRoleAssignmentMessage msg = new 
PrimaryRoleAssignmentMessage();
+            msg.setNodeId(nodeId);
+            msg.setPrimary(true);
+            logger.info("Attempting to assign primary role to node: " + 
nodeId);
+
+            // message 
+            senderListener.assignPrimaryRole(msg);
+
+            logger.info("Assigned primary role to node: " + nodeId);
+            addBulletin(nodeId, Severity.INFO, "Node assigned primary role");
+
+            // true indicates primary role assigned
+            return true;
+
+        } catch (final ProtocolException ex) {
+
+            logger.warn("Failed attempt to assign primary role to node " + 
nodeId + " due to " + ex);
+            addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role 
to node due to: " + ex);
+
+            // mark node as disconnected and log/record the event
+            final Node node = getRawNode(nodeId.getId());
+            node.setStatus(Status.DISCONNECTED);
+            addEvent(node.getNodeId(), "Disconnected because of failed attempt 
to assign primary role.");
+
+            addBulletin(nodeId, Severity.WARNING, "Node disconnected because 
of failed attempt to assign primary role");
+
+            // false indicates primary role failed to be assigned
+            return false;
+        } finally {
+            writeLock.unlock("assignPrimaryRole");
+        }
+    }
+
+    /**
+     * Messages the node with the given node ID to no longer have the primary
+     * role. If the messaging fails, then the node is marked as disconnected.
+     *
+     * @return true if the primary role was revoked from the node; false
+     * otherwise
+     */
+    private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
+        writeLock.lock();
+        try {
+            // create primary role message
+            final PrimaryRoleAssignmentMessage msg = new 
PrimaryRoleAssignmentMessage();
+            msg.setNodeId(nodeId);
+            msg.setPrimary(false);
+            logger.info("Attempting to revoke primary role from node: " + 
nodeId);
+
+            // send message
+            senderListener.assignPrimaryRole(msg);
+
+            logger.info("Revoked primary role from node: " + nodeId);
+            addBulletin(nodeId, Severity.INFO, "Primary Role revoked from 
node");
+
+            // true indicates primary role was revoked
+            return true;
+        } catch (final ProtocolException ex) {
+
+            logger.warn("Failed attempt to revoke primary role from node " + 
nodeId + " due to " + ex);
+
+            // mark node as disconnected and log/record the event
+            final Node node = getRawNode(nodeId.getId());
+            node.setStatus(Status.DISCONNECTED);
+            addEvent(node.getNodeId(), "Disconnected because of failed attempt 
to revoke primary role.");
+            addBulletin(node, Severity.ERROR, "Node disconnected because of 
failed attempt to revoke primary role");
+
+            // false indicates primary role failed to be revoked
+            return false;
+        } finally {
+            writeLock.unlock("revokePrimaryRole");
+        }
+    }
+
+    private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final 
String dn) {
+        return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(),
+                nodeId.getApiPort(), nodeId.getSocketAddress(), 
nodeId.getSocketPort(), dn);
+    }
+
+    private ConnectionResponseMessage handleConnectionRequest(final 
ConnectionRequestMessage requestMessage) {
+        final NodeIdentifier proposedIdentifier = 
requestMessage.getConnectionRequest().getProposedNodeIdentifier();
+        final ConnectionRequest requestWithDn = new 
ConnectionRequest(addRequestorDn(proposedIdentifier, 
requestMessage.getRequestorDN()));
+
+        final ConnectionResponse response = requestConnection(requestWithDn);
+        final ConnectionResponseMessage responseMessage = new 
ConnectionResponseMessage();
+        responseMessage.setConnectionResponse(response);
+        return responseMessage;
+    }
+
+    private void handleControllerStartupFailure(final 
ControllerStartupFailureMessage msg) {
+        writeLock.lock();
+        try {
+            final Node node = getRawNode(msg.getNodeId().getId());
+            if (node != null) {
+                node.setStatus(Status.DISCONNECTED);
+                addEvent(msg.getNodeId(), "Node could not join cluster because 
it failed to start up properly. Setting node to Disconnected. Node reported the 
following error: " + msg.getExceptionMessage());
+                addBulletin(node, Severity.ERROR, "Node could not join cluster 
because it failed to start up properly. Setting node to Disconnected. Node 
reported the following error: " + msg.getExceptionMessage());
+            }
+        } finally {
+            writeLock.unlock("handleControllerStartupFailure");
+        }
+    }
+
+    private void handleReconnectionFailure(final ReconnectionFailureMessage 
msg) {
+        writeLock.lock();
+        try {
+            final Node node = getRawNode(msg.getNodeId().getId());
+            if (node != null) {
+                node.setStatus(Status.DISCONNECTED);
+                final String errorMsg = "Node could not rejoin cluster. 
Setting node to Disconnected. Node reported the following error: " + 
msg.getExceptionMessage();
+                addEvent(msg.getNodeId(), errorMsg);
+                addBulletin(node, Severity.ERROR, errorMsg);
+            }
+        } finally {
+            writeLock.unlock("handleControllerStartupFailure");
+        }
+    }
+
+    /**
+     * Adds an instance of a specified controller service.
+     *
+     * @param type
+     * @param id
+     * @param properties
+     * @return
+     */
+    @Override
+    public ControllerServiceNode createControllerService(String type, String 
id, Map<String, String> properties) {
+        return controllerServiceProvider.createControllerService(type, id, 
properties);
+    }
+
+    @Override
+    public ControllerService getControllerService(String serviceIdentifier) {
+        return 
controllerServiceProvider.getControllerService(serviceIdentifier);
+    }
+
+    @Override
+    public ControllerServiceNode getControllerServiceNode(final String id) {
+        return controllerServiceProvider.getControllerServiceNode(id);
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(final ControllerService service) 
{
+        return controllerServiceProvider.isControllerServiceEnabled(service);
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+        return 
controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
+    }
+
+    /**
+     * Handle a bulletins message.
+     *
+     * @param bulletins
+     */
+    public void handleBulletins(final NodeBulletins bulletins) {
+        final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
+        final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + 
nodeIdentifier.getApiPort();
+
+        // unmarshal the message
+        BulletinsPayload payload = 
BulletinsPayload.unmarshal(bulletins.getPayload());
+        for (final Bulletin bulletin : payload.getBulletins()) {
+            bulletin.setNodeAddress(nodeAddress);
+            bulletinRepository.addBulletin(bulletin);
+        }
+    }
+
+    /**
+     * Handles a node's heartbeat. If this heartbeat is a node's first 
heartbeat
+     * since its connection request, then the manager will mark the node as
+     * connected. If the node was previously disconnected due to a lack of
+     * heartbeat, then a reconnection request is issued. If the node was
+     * disconnected for other reasons, then a disconnection request is issued.
+     * If this instance is configured with a firewall and the heartbeat is
+     * blocked, then a disconnection request is issued.
+     *
+     * @param heartbeat
+     */
+    @Override
+    public void handleHeartbeat(final Heartbeat heartbeat) {
+        // sanity check heartbeat
+        if (heartbeat == null) {
+            throw new IllegalArgumentException("Heartbeat may not be null.");
+        } else if (heartbeat.getNodeIdentifier() == null) {
+            throw new IllegalArgumentException("Heartbeat does not contain a 
node ID.");
+        }
+
+        /*
+         * Processing a heartbeat requires a write lock, which may take a while
+         * to obtain.  Only the last heartbeat is necessary to process per 
node.
+         * Futhermore, since many could pile up, heartbeats are processed in 
+         * bulk.
+         * 
+         * The below queue stores the pending heartbeats.
+         */
+        pendingHeartbeats.add(heartbeat);
+    }
+
+    private void processPendingHeartbeats() {
+        Node node;
+
+        writeLock.lock();
+        try {
+            /*
+             * Get the most recent heartbeats for the nodes in the cluster.  
This
+             * is achieved by "draining" the pending heartbeats queue, 
populating
+             * a map that associates a node identifier with its latest 
heartbeat, and
+             * finally, getting the values of the map.
+             */
+            final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new 
HashMap<>();
+            Heartbeat aHeartbeat;
+            while ((aHeartbeat = pendingHeartbeats.poll()) != null) {
+                mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), 
aHeartbeat);
+            }
+            final Collection<Heartbeat> mostRecentHeartbeats = new 
ArrayList<>(mostRecentHeartbeatsMap.values());
+
+            // return fast if no work to do
+            if (mostRecentHeartbeats.isEmpty()) {
+                return;
+            }
+
+            logNodes("Before Heartbeat Processing", heartbeatLogger);
+
+            final int numPendingHeartbeats = mostRecentHeartbeats.size();
+            if (heartbeatLogger.isDebugEnabled()) {
+                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", 
numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
+            }
+
+            for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
+                try {
+                    // resolve the proposed node identifier to valid node 
identifier
+                    final NodeIdentifier resolvedNodeIdentifier = 
resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier());
+
+                    // get a raw reference to the node (if it doesn't exist, 
node will be null)
+                    node = getRawNode(resolvedNodeIdentifier.getId());
+
+                    // if the node thinks it has the primary role, but the 
manager has assigned the role to a different node, then revoke the role
+                    if (mostRecentHeartbeat.isPrimary() && 
!isPrimaryNode(resolvedNodeIdentifier)) {
+                        addEvent(resolvedNodeIdentifier, "Heartbeat indicates 
node is running as primary node.  Revoking primary role because primary role is 
assigned to a different node.");
+                        revokePrimaryRole(resolvedNodeIdentifier);
+                    }
+
+                    final boolean heartbeatIndicatesNotYetConnected = 
!mostRecentHeartbeat.isConnected();
+
+                    if 
(isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
+                        if (node == null) {
+                            logger.info("Firewall blocked heartbeat received 
from unknown node " + resolvedNodeIdentifier + ".  Issuing disconnection 
request.");
+                        } else {
+                            // record event
+                            addEvent(resolvedNodeIdentifier, "Firewall blocked 
received heartbeat.  Issuing disconnection request.");
+                        }
+
+                        // request node to disconnect
+                        requestDisconnectionQuietly(resolvedNodeIdentifier, 
"Blocked By Firewall");
+
+                    } else if (node == null) {  // unknown node, so issue 
reconnect request
+                        // create new node and add to node set
+                        final Node newNode = new Node(resolvedNodeIdentifier, 
Status.DISCONNECTED);
+                        nodes.add(newNode);
+
+                        // record event
+                        addEvent(newNode.getNodeId(), "Received heartbeat from 
unknown node.  Issuing reconnection request.");
+
+                        // record heartbeat
+                        newNode.setHeartbeat(mostRecentHeartbeat);
+                        requestReconnection(resolvedNodeIdentifier.getId(), 
"NCM Heartbeat Processing");
+                    } else if (heartbeatIndicatesNotYetConnected) {
+                        if (Status.CONNECTED == node.getStatus()) {
+                            // record event
+                            addEvent(node.getNodeId(), "Received heartbeat 
from node that thinks it is not yet part of the cluster, though the Manager 
thought it was. Marking as Disconnected and issuing reconnection request.");
+
+                            // record heartbeat
+                            node.setHeartbeat(null);
+                            node.setStatus(Status.DISCONNECTED);
+
+                            
requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
+                        }
+                    } else if (Status.DISCONNECTED == node.getStatus()) {
+                        // ignore heartbeats from nodes disconnected by means 
other than lack of heartbeat, unless it is
+                        // the only node. We allow it if it is the only node 
because if we have a one-node cluster, then
+                        // we cannot manually reconnect it.
+                        if (node.isHeartbeatDisconnection() || nodes.size() == 
1) {
+                            // record event
+                            if (node.isHeartbeatDisconnection()) {
+                                addEvent(resolvedNodeIdentifier, "Received 
heartbeat from node previously disconnected due to lack of heartbeat.  Issuing 
reconnection request.");
+                            } else {
+                                addEvent(resolvedNodeIdentifier, "Received 
heartbeat from node previously disconnected, but it is the only known node, so 
issuing reconnection request.");
+                            }
+
+                            // record heartbeat
+                            node.setHeartbeat(mostRecentHeartbeat);
+
+                            // request reconnection
+                            
requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
+                        } else {
+                            // disconnected nodes should not heartbeat, so we 
need to issue a disconnection request
+                            heartbeatLogger.info("Ignoring received heartbeat 
from disconnected node " + resolvedNodeIdentifier + ".  Issuing disconnection 
request.");
+
+                            // request node to disconnect
+                            
requestDisconnectionQuietly(resolvedNodeIdentifier, "Received Heartbeat from 
Node, but Manager has already marked Node as Disconnected");
+                        }
+
+                    } else if (Status.DISCONNECTING == node.getStatus()) {
+                        /* ignore spurious heartbeat */
+                    } else {  // node is either either connected or connecting
+                        // first heartbeat causes status change from 
connecting to connected
+                        if (Status.CONNECTING == node.getStatus()) {
+                            if (mostRecentHeartbeat.getCreatedTimestamp() < 
node.getConnectionRequestedTimestamp()) {
+                                heartbeatLogger.info("Received heartbeat for 
node " + resolvedNodeIdentifier + " but ignoring because it was generated 
before the node was last asked to reconnect.");
+                                continue;
+                            }
+
+                            // set status to connected
+                            node.setStatus(Status.CONNECTED);
+
+                            // record event
+                            addEvent(resolvedNodeIdentifier, "Received first 
heartbeat from connecting node.  Setting node to connected.");
+
+                            // notify service of updated node set
+                            
notifyDataFlowManagementServiceOfNodeStatusChange();
+
+                            addBulletin(node, Severity.INFO, "Node Connected");
+                        } else {
+                            heartbeatLogger.info("Received heartbeat for node 
" + resolvedNodeIdentifier + ".");
+                        }
+
+                        // record heartbeat
+                        node.setHeartbeat(mostRecentHeartbeat);
+
+                        ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
+                        if (statusRepository == null) {
+                            statusRepository = 
createComponentStatusRepository();
+                            
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
+                        }
+
+                        // If it's been a while since we've captured, capture 
this metric.
+                        final Date lastCaptureDate = 
statusRepository.getLastCaptureDate();
+                        final long millisSinceLastCapture = (lastCaptureDate 
== null) ? Long.MAX_VALUE : (System.currentTimeMillis() - 
lastCaptureDate.getTime());
+
+                        if (millisSinceLastCapture > 
componentStatusSnapshotMillis) {
+                            
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
+                        }
+                    }
+                } catch (final Exception e) {
+                    logger.error("Failed to process heartbeat from {}:{} due 
to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), 
mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
+                    if (logger.isDebugEnabled()) {
+                        logger.error("", e);
+                    }
+                }
+            }
+
+            logNodes("After Heartbeat Processing", heartbeatLogger);
+        } finally {
+            writeLock.unlock("processPendingHeartbeats");
+        }
+    }
+
+    private ComponentStatusRepository createComponentStatusRepository() {
+        final String implementationClassName = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
 DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
+        if (implementationClassName == null) {
+            throw new RuntimeException("Cannot create Component Status 
Repository because the NiFi Properties is missing the following property: "
+                    + 
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+        }
+
+        try {
+            return 
NarThreadContextClassLoader.createInstance(implementationClassName, 
ComponentStatusRepository.class);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Set<Node> getNodes(final Status... statuses) {
+        final Set<Status> desiredStatusSet = new HashSet<>();
+        for (final Status status : statuses) {
+            desiredStatusSet.add(status);
+        }
+
+        readLock.lock();
+        try {
+            final Set<Node> clonedNodes = new HashSet<>();
+            for (final Node node : nodes) {
+                if (desiredStatusSet.isEmpty() || 
desiredStatusSet.contains(node.getStatus())) {
+                    clonedNodes.add(node.clone());
+                }
+            }
+            return Collections.unmodifiableSet(clonedNodes);
+        } finally {
+            readLock.unlock("getNodes(Status...)");
+        }
+    }
+
+    @Override
+    public Node getNode(final String nodeId) {
+        readLock.lock();
+        try {
+            for (final Node node : nodes) {
+                if (node.getNodeId().getId().equals(nodeId)) {
+                    return node.clone();
+                }
+            }
+            return null;
+        } finally {
+            readLock.unlock("getNode(String)");
+        }
+    }
+
+    @Override
+    public Node getPrimaryNode() {
+        readLock.lock();
+        try {
+            if (primaryNodeId == null) {
+                return null;
+            } else {
+                return getNode(primaryNodeId.getId());
+            }
+        } finally {
+            readLock.unlock("getPrimaryNode");
+        }
+    }
+
+    @Override
+    public void deleteNode(final String nodeId, final String userDn) throws 
UnknownNodeException, IllegalNodeDeletionException {
+        writeLock.lock();
+        try {
+            final Node node = getNode(nodeId);
+            if (node == null) {
+                throw new UnknownNodeException("Node does not exist.");
+            } else if (Status.DISCONNECTED == node.getStatus()) {
+                nodes.remove(node);
+
+                if (eventManager != null) {
+                    eventManager.clearEventHistory(node.getNodeId().getId());
+                }
+
+                logger.info("Removing node {} from cluster because this action 
was requested by {}", node, userDn);
+            } else {
+                throw new IllegalNodeDeletionException("Node may not be 
deleted because it is not disconnected.");
+            }
+        } finally {
+            writeLock.unlock("deleteNode");
+        }
+    }
+
+    @Override
+    public Set<NodeIdentifier> getNodeIds(final Status... statuses) {
+        readLock.lock();
+        try {
+            final Set<NodeIdentifier> nodeIds = new HashSet<>();
+            for (final Node node : nodes) {
+                if (statuses == null || statuses.length == 0) {
+                    nodeIds.add(node.getNodeId());
+                } else {
+                    for (final Node.Status status : statuses) {
+                        if (node.getStatus() == status) {
+                            nodeIds.add(node.getNodeId());
+                            break;
+                        }
+                    }
+                }
+            }
+            return nodeIds;
+        } finally {
+            readLock.unlock("getNodeIds(Status...)");
+        }
+    }
+
+    @Override
+    public void setPrimaryNode(final String nodeId, final String userDn) 
throws UnknownNodeException, IneligiblePrimaryNodeException, 
PrimaryRoleAssignmentException {
+        writeLock.lock();
+        try {
+
+            final Node node = getNode(nodeId);
+            if (node == null) {
+                throw new UnknownNodeException("Node does not exist.");
+            } else if (Status.CONNECTED != node.getStatus()) {
+                throw new IneligiblePrimaryNodeException("Node must be 
connected before it can be assigned as the primary node.");
+            }
+
+            // revoke primary role
+            final Node primaryNode;
+            if ((primaryNode = getPrimaryNode()) != null) {
+                if (primaryNode.getStatus() == Status.DISCONNECTED) {
+                    throw new PrimaryRoleAssignmentException("A disconnected, 
primary node exists.  Delete the node before assigning the primary role to a 
different node.");
+                } else if (revokePrimaryRole(primaryNode.getNodeId())) {
+                    addEvent(primaryNode.getNodeId(), "Role revoked from this 
node as part of primary role reassignment.");
+                } else {
+                    throw new PrimaryRoleAssignmentException(
+                            "Failed to revoke primary role from node. Primary 
node is now disconnected. Delete the node before assigning the primary role to 
a different node.");
+                }
+            }
+
+            // change the primary node ID to the given node
+            setPrimaryNodeId(node.getNodeId());
+
+            // assign primary role
+            if (assignPrimaryRole(node.getNodeId())) {
+                addEvent(node.getNodeId(), "Role assigned to this node as part 
of primary role reassignment. Action performed by " + userDn);
+                addBulletin(node, Severity.INFO, "Primary Role assigned to 
node by " + userDn);
+            } else {
+                throw new PrimaryRoleAssignmentException(
+                        "Cluster manager assigned primary role to node, but 
the node failed to accept the assignment.  Cluster manager disconnected node.");
+            }
+        } finally {
+            writeLock.unlock("setPrimaryNode");
+        }
+    }
+
+    private int getClusterProtocolHeartbeatSeconds() {
+        return (int) 
FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), 
TimeUnit.SECONDS);
+    }
+
+    @Override
+    public int getHeartbeatMonitoringIntervalSeconds() {
+        return 4 * getClusterProtocolHeartbeatSeconds();
+    }
+
+    @Override
+    public int getMaxHeartbeatGapSeconds() {
+        return 8 * getClusterProtocolHeartbeatSeconds();
+    }
+
+    @Override
+    public List<Event> getNodeEvents(final String nodeId) {
+        readLock.lock();
+        try {
+            List<Event> events = null;
+            final EventManager eventMgr = eventManager;
+            if (eventMgr != null) {
+                events = eventMgr.getEvents(nodeId);
+            }
+
+            if (events == null) {
+                return Collections.emptyList();
+            } else {
+                return Collections.unmodifiableList(events);
+            }
+        } finally {
+            readLock.unlock("getNodeEvents");
+        }
+    }
+
+    @Override
+    public NodeResponse applyRequest(final String method, final URI uri, final 
Map<String, List<String>> parameters, final Map<String, String> headers)
+            throws NoConnectedNodesException, NoResponseFromNodesException, 
UriConstructionException, ConnectingNodeMutableRequestException, 
DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
+        return applyRequest(method, uri, parameters, headers, 
getNodeIds(Status.CONNECTED));
+    }
+
+    @Override
+    public NodeResponse applyRequest(final String method, final URI uri, final 
Map<String, List<String>> parameters, final Map<String, String> headers, final 
Set<NodeIdentifier> nodeIdentifiers)
+            throws NoConnectedNodesException, NoResponseFromNodesException, 
UriConstructionException, ConnectingNodeMutableRequestException, 
DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
+
+        final boolean mutableRequest = canChangeNodeState(method, uri);
+        final ClusterManagerLock lock = mutableRequest ? writeLock : readLock;
+
+        lock.lock();
+        try {
+            // check that the request can be applied
+            if (mutableRequest) {
+                if (isInSafeMode()) {
+                    throw new SafeModeMutableRequestException("Received a 
mutable request [" + method + " -- " + uri + "] while in safe mode");
+                } else if (!getNodeIds(Status.DISCONNECTED, 
Status.DISCONNECTING).isEmpty()) {
+                    throw new 
DisconnectedNodeMutableRequestException("Received a mutable request [" + method 
+ " -- " + uri + "] while a node is disconnected from the cluster");
+                } else if (!getNodeIds(Status.CONNECTING).isEmpty()) {
+                    // if any node is connecting and a request can change the 
flow, then we throw an exception
+                    throw new ConnectingNodeMutableRequestException("Received 
a mutable request [" + method + " -- " + uri + "] while a node is trying to 
connect to the cluster");
+                }
+            }
+
+            final NodeResponse clientResponse = federateRequest(method, uri, 
parameters, null, headers, nodeIdentifiers);
+            if (clientResponse == null) {
+                if (mutableRequest) {
+                    throw new NoConnectedNodesException(String.format("All 
nodes were disconnected as a result of applying request %s %s", method, uri));
+                } else {
+                    throw new NoResponseFromNodesException("No nodes were able 
to process this request.");
+                }
+            } else {
+                return clientResponse;
+            }
+        } finally {
+            lock.unlock("applyRequest(String, URI, Map<String, List<String>>, 
Map<String, String>, Set<NodeIdentifier>");
+        }
+    }
+
+    @Override
+    public NodeResponse applyRequest(final String method, final URI uri, final 
Object entity, final Map<String, String> headers)
+            throws NoConnectedNodesException, NoResponseFromNodesException, 
UriConstructionException, ConnectingNodeMutableRequestException, 
DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
+        return applyRequest(method, uri, entity, headers, 
getNodeIds(Status.CONNECTED));
+    }
+
+    @Override
+    public NodeResponse applyRequest(final String method, final URI uri, final 
Object entity, final Map<String, String> head

<TRUNCATED>

Reply via email to