http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java deleted file mode 100644 index 4d5455f..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ /dev/null @@ -1,3628 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.impl; - -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; -import java.net.URI; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; - -import javax.net.ssl.SSLContext; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.StreamingOutput; -import javax.xml.XMLConstants; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.dom.DOMSource; -import javax.xml.validation.Schema; -import javax.xml.validation.SchemaFactory; -import javax.xml.validation.Validator; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.admin.service.AuditService; -import org.apache.nifi.cluster.BulletinsPayload; -import org.apache.nifi.cluster.HeartbeatPayload; -import org.apache.nifi.cluster.context.ClusterContext; -import org.apache.nifi.cluster.context.ClusterContextImpl; -import org.apache.nifi.cluster.event.Event; -import org.apache.nifi.cluster.event.EventManager; -import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; -import org.apache.nifi.cluster.flow.ClusterDataFlow; -import org.apache.nifi.cluster.flow.DaoException; -import org.apache.nifi.cluster.flow.DataFlowManagementService; -import org.apache.nifi.cluster.flow.PersistedFlowState; -import org.apache.nifi.cluster.manager.HttpClusterManager; -import org.apache.nifi.cluster.manager.HttpRequestReplicator; -import org.apache.nifi.cluster.manager.HttpResponseMapper; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; -import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; -import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException; -import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException; -import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; -import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException; -import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException; -import org.apache.nifi.cluster.manager.exception.NodeReconnectionException; -import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException; -import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.UnknownNodeException; -import org.apache.nifi.cluster.manager.exception.UriConstructionException; -import org.apache.nifi.cluster.node.Node; -import org.apache.nifi.cluster.node.Node.Status; -import org.apache.nifi.cluster.protocol.ConnectionRequest; -import org.apache.nifi.cluster.protocol.ConnectionResponse; -import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeBulletins; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.StandardDataFlow; -import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener; -import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; -import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; -import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; -import org.apache.nifi.controller.scheduling.StandardProcessScheduler; -import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.controller.service.StandardControllerServiceProvider; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.history.ComponentStatusRepository; -import org.apache.nifi.controller.status.history.MetricDescriptor; -import org.apache.nifi.controller.status.history.StatusHistory; -import org.apache.nifi.controller.status.history.StatusHistoryUtil; -import org.apache.nifi.controller.status.history.StatusSnapshot; -import org.apache.nifi.diagnostics.GarbageCollection; -import org.apache.nifi.diagnostics.StorageUsage; -import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.events.VolatileBulletinRepository; -import org.apache.nifi.framework.security.util.SslContextFactory; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.nar.NarThreadContextClassLoader; -import org.apache.nifi.processor.StandardValidationContextFactory; -import org.apache.nifi.remote.RemoteResourceManager; -import org.apache.nifi.remote.RemoteSiteListener; -import org.apache.nifi.remote.SocketRemoteSiteListener; -import org.apache.nifi.remote.cluster.ClusterNodeInformation; -import org.apache.nifi.remote.cluster.NodeInformation; -import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol; -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.reporting.ReportingInitializationContext; -import org.apache.nifi.reporting.ReportingTask; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.DomUtils; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; -import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; -import org.apache.nifi.web.api.entity.FlowSnippetEntity; -import org.apache.nifi.web.api.entity.ProcessGroupEntity; -import org.apache.nifi.web.api.entity.ProcessorEntity; -import org.apache.nifi.web.api.entity.ProcessorsEntity; -import org.apache.nifi.web.api.entity.ProvenanceEntity; -import org.apache.nifi.web.api.entity.ProvenanceEventEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; -import org.apache.nifi.web.util.WebUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.DOMException; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; -import org.xml.sax.SAXException; -import org.xml.sax.SAXParseException; - -import com.sun.jersey.api.client.ClientResponse; - -/** - * Provides a cluster manager implementation. The manager federates incoming - * HTTP client requests to the nodes' external API using the HTTP protocol. The - * manager also communicates with nodes using the nodes' internal socket - * protocol. - * - * The manager's socket address may broadcasted using multicast if a - * MulticastServiceBroadcaster instance is set on this instance. The manager - * instance must be started after setting the broadcaster. - * - * The manager may be configured with an EventManager for recording noteworthy - * lifecycle events (e.g., first heartbeat received, node status change). - * - * The start() and stop() methods must be called to initialize and stop the - * instance. - * - * @author unattributed - */ -public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider { - - public static final String ROOT_GROUP_ID_ALIAS = "root"; - public static final String BULLETIN_CATEGORY = "Clustering"; - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class)); - private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat")); - - /** - * The HTTP header to store a cluster context. An example of what may be - * stored in the context is a node's auditable actions in response to a - * cluster request. The cluster context is serialized using Java's - * serialization mechanism and hex encoded. - */ - public static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext"; - - /** - * HTTP Header that stores a unique ID for each request that is replicated - * to the nodes. This is used for logging purposes so that request - * information, such as timing, can be correlated between the NCM and the - * nodes - */ - public static final String REQUEST_ID_HEADER = "X-RequestID"; - - /** - * The HTTP header that the NCM specifies to ask a node if they are able to - * process a given request. The value is always 150-NodeContinue. The node - * will respond with 150 CONTINUE if it is able to process the request, 417 - * EXPECTATION_FAILED otherwise. - */ - public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects"; - public static final int NODE_CONTINUE_STATUS_CODE = 150; - - /** - * The HTTP header that the NCM specifies to indicate that a node should - * invalidate the specified user group. This is done to ensure that user - * cache is not stale when an administrator modifies a group through the UI. - */ - public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = "X-ClusterInvalidateUserGroup"; - - /** - * The HTTP header that the NCM specifies to indicate that a node should - * invalidate the specified user. This is done to ensure that user cache is - * not stale when an administrator modifies a user through the UI. - */ - public static final String CLUSTER_INVALIDATE_USER_HEADER = "X-ClusterInvalidateUser"; - - /** - * The default number of seconds to respond to a connecting node if the - * manager cannot provide it with a current data flow. - */ - private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5; - - public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository"; - - public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); - public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}"); - - public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); - public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}"); - - public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))"); - public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance"); - public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance"); - - public static final String PROVENANCE_URI = "/nifi-api/controller/provenance"; - public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}"); - public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+"); - - private final NiFiProperties properties; - private final HttpRequestReplicator httpRequestReplicator; - private final HttpResponseMapper httpResponseMapper; - private final DataFlowManagementService dataFlowManagementService; - private final ClusterManagerProtocolSenderListener senderListener; - private final StringEncryptor encryptor; - private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>(); - private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock(); - private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read"); - private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write"); - - private final Set<Node> nodes = new HashSet<>(); - private final Set<ReportingTaskNode> reportingTasks = new HashSet<>(); - - // null means the dataflow should be read from disk - private StandardDataFlow cachedDataFlow = null; - private NodeIdentifier primaryNodeId = null; - private Revision revision = new Revision(0L, ""); - private Timer heartbeatMonitor; - private Timer heartbeatProcessor; - private volatile ClusterServicesBroadcaster servicesBroadcaster = null; - private volatile EventManager eventManager = null; - private volatile ClusterNodeFirewall clusterFirewall = null; - private volatile AuditService auditService = null; - private volatile ControllerServiceProvider controllerServiceProvider = null; - - private final RemoteSiteListener remoteSiteListener; - private final Integer remoteInputPort; - private final Boolean remoteCommsSecure; - private final BulletinRepository bulletinRepository; - private final String instanceId; - private final FlowEngine reportingTaskEngine; - private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>(); - private final StandardProcessScheduler processScheduler; - private final long componentStatusSnapshotMillis; - - public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper, - final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener, - final NiFiProperties properties, final StringEncryptor encryptor) { - - if (httpRequestReplicator == null) { - throw new IllegalArgumentException("HttpRequestReplicator may not be null."); - } else if (httpResponseMapper == null) { - throw new IllegalArgumentException("HttpResponseMapper may not be null."); - } else if (dataFlowManagementService == null) { - throw new IllegalArgumentException("DataFlowManagementService may not be null."); - } else if (senderListener == null) { - throw new IllegalArgumentException("ClusterManagerProtocolSenderListener may not be null."); - } else if (properties == null) { - throw new IllegalArgumentException("NiFiProperties may not be null."); - } - - // Ensure that our encryptor/decryptor is properly initialized - this.httpRequestReplicator = httpRequestReplicator; - this.httpResponseMapper = httpResponseMapper; - this.dataFlowManagementService = dataFlowManagementService; - this.properties = properties; - this.controllerServiceProvider = new StandardControllerServiceProvider(); - this.bulletinRepository = new VolatileBulletinRepository(); - this.instanceId = UUID.randomUUID().toString(); - this.senderListener = senderListener; - this.encryptor = encryptor; - senderListener.addHandler(this); - senderListener.setBulletinRepository(bulletinRepository); - - final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); - long snapshotMillis; - try { - snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); - } - componentStatusSnapshotMillis = snapshotMillis; - - remoteInputPort = properties.getRemoteInputPort(); - if (remoteInputPort == null) { - remoteSiteListener = null; - remoteCommsSecure = null; - } else { - // Register the ClusterManagerServerProtocol as the appropriate resource for site-to-site Server Protocol - RemoteResourceManager.setServerProtocolImplementation(ClusterManagerServerProtocol.RESOURCE_NAME, ClusterManagerServerProtocol.class); - remoteCommsSecure = properties.isSiteToSiteSecure(); - if (remoteCommsSecure) { - final SSLContext sslContext = SslContextFactory.createSslContext(properties, false); - - if (sslContext == null) { - throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured"); - } - - remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), sslContext, this); - } else { - remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), null, this); - } - } - - reportingTaskEngine = new FlowEngine(8, "Reporting Task Thread"); - - processScheduler = new StandardProcessScheduler(new Heartbeater() { - @Override - public void heartbeat() { - } - }, this, encryptor); - processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor)); - processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); - processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); - } - - public void start() throws IOException { - writeLock.lock(); - try { - - if (isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - try { - // setup heartbeat monitoring - heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon */ true); - heartbeatMonitor.scheduleAtFixedRate(new HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 1000); - - heartbeatProcessor = new Timer("Process Pending Heartbeats", true); - final int processPendingHeartbeatDelay = 1000 * Math.max(1, getClusterProtocolHeartbeatSeconds() / 2); - heartbeatProcessor.schedule(new ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, processPendingHeartbeatDelay); - - // start request replication service - httpRequestReplicator.start(); - - // start protocol service - senderListener.start(); - - // start flow management service - dataFlowManagementService.start(); - - if (remoteSiteListener != null) { - remoteSiteListener.start(); - } - - // load flow - if (dataFlowManagementService.isFlowCurrent()) { - final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow(); - cachedDataFlow = clusterDataFlow.getDataFlow(); - primaryNodeId = clusterDataFlow.getPrimaryNodeId(); - } else { - throw new IOException("Flow is not current."); - } - - // start multicast broadcasting service, if configured - if (servicesBroadcaster != null) { - servicesBroadcaster.start(); - } - - // start in safe mode - executeSafeModeTask(); - - // Load and start running Reporting Tasks - final File taskFile = new File(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE)); - reportingTasks.addAll(loadReportingTasks(taskFile)); - } catch (final IOException ioe) { - logger.warn("Failed to initialize cluster services due to: " + ioe, ioe); - stop(); - throw ioe; - } - - } finally { - writeLock.unlock("START"); - } - } - - public void stop() throws IOException { - writeLock.lock(); - try { - - // returns true if any service is running - if (isRunning() == false) { - throw new IllegalArgumentException("Instance is already stopped."); - } - - boolean encounteredException = false; - - // stop the heartbeat monitoring - if (isHeartbeatMonitorRunning()) { - heartbeatMonitor.cancel(); - heartbeatMonitor = null; - } - - if (heartbeatProcessor != null) { - heartbeatProcessor.cancel(); - heartbeatProcessor = null; - } - - // stop the HTTP request replicator service - if (httpRequestReplicator.isRunning()) { - httpRequestReplicator.stop(); - } - - // stop the flow management service - if (dataFlowManagementService.isRunning()) { - dataFlowManagementService.stop(); - } - - if (remoteSiteListener != null) { - remoteSiteListener.stop(); - } - - // stop the protocol listener service - if (senderListener.isRunning()) { - try { - senderListener.stop(); - } catch (final IOException ioe) { - encounteredException = true; - logger.warn("Failed to shutdown protocol service due to: " + ioe, ioe); - } - } - - // stop the service broadcaster - if (isBroadcasting()) { - servicesBroadcaster.stop(); - } - - if ( processScheduler != null ) { - processScheduler.shutdown(); - } - - if (encounteredException) { - throw new IOException("Failed to shutdown Cluster Manager because one or more cluster services failed to shutdown. Check the logs for details."); - } - - } finally { - writeLock.unlock("STOP"); - } - } - - public boolean isRunning() { - readLock.lock(); - try { - return isHeartbeatMonitorRunning() - || httpRequestReplicator.isRunning() - || senderListener.isRunning() - || dataFlowManagementService.isRunning() - || isBroadcasting(); - } finally { - readLock.unlock("isRunning"); - } - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return MessageType.CONNECTION_REQUEST == msg.getType() - || MessageType.HEARTBEAT == msg.getType() - || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType() - || MessageType.BULLETINS == msg.getType() - || MessageType.RECONNECTION_FAILURE == msg.getType(); - } - - @Override - public ProtocolMessage handle(final ProtocolMessage protocolMessage) throws ProtocolException { - switch (protocolMessage.getType()) { - case CONNECTION_REQUEST: - return handleConnectionRequest((ConnectionRequestMessage) protocolMessage); - case HEARTBEAT: - final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) protocolMessage; - - final Heartbeat original = heartbeatMessage.getHeartbeat(); - final NodeIdentifier originalNodeId = original.getNodeIdentifier(); - final Heartbeat heartbeatWithDn = new Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), original.isPrimary(), original.isConnected(), original.getPayload()); - - handleHeartbeat(heartbeatWithDn); - return null; - case CONTROLLER_STARTUP_FAILURE: - new Thread(new Runnable() { - @Override - public void run() { - handleControllerStartupFailure((ControllerStartupFailureMessage) protocolMessage); - } - }, "Handle Controller Startup Failure Message from " + ((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start(); - return null; - case RECONNECTION_FAILURE: - new Thread(new Runnable() { - @Override - public void run() { - handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage); - } - }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start(); - return null; - case BULLETINS: - final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage; - handleBulletins(bulletinsMessage.getBulletins()); - return null; - default: - throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType()); - } - } - - /** - * Services connection requests. If the data flow management service is - * unable to provide a current copy of the data flow, then the returned - * connection response will indicate the node should try later. Otherwise, - * the connection response will contain the the flow and the node - * identifier. - * - * If this instance is configured with a firewall and the request is - * blocked, then the response will not contain a node identifier. - * - * @param request a connection request - * - * @return a connection response - */ - @Override - public ConnectionResponse requestConnection(final ConnectionRequest request) { - final boolean lockObtained = writeLock.tryLock(3, TimeUnit.SECONDS); - if (!lockObtained) { - // Create try-later response because we are too busy to service the request right now. We do not want - // to wait long because we want Node/NCM comms to be very responsive - final int tryAgainSeconds; - if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) { - tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS; - } else { - tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds(); - } - - // record event - final String msg = "Connection requested from node, but manager was too busy to service request. Instructing node to try again in " + tryAgainSeconds + " seconds."; - addEvent(request.getProposedNodeIdentifier(), msg); - addBulletin(request.getProposedNodeIdentifier(), Severity.INFO, msg); - - // return try later response - return new ConnectionResponse(tryAgainSeconds); - } - - try { - // resolve the proposed node identifier to a valid node identifier - final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(request.getProposedNodeIdentifier()); - - if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) { - // if the socket address is not listed in the firewall, then return a null response - logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier); - return ConnectionResponse.createBlockedByFirewallResponse(); - } - - // get a raw reference to the node (if it doesn't exist, node will be null) - Node node = getRawNode(resolvedNodeIdentifier.getId()); - - // create a new node if necessary and set status to connecting - if (node == null) { - node = new Node(resolvedNodeIdentifier, Status.CONNECTING); - addEvent(node.getNodeId(), "Connection requested from new node. Setting status to connecting."); - nodes.add(node); - } else { - node.setStatus(Status.CONNECTING); - addEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting"); - } - - // record the time of the connection request - node.setConnectionRequestedTimestamp(new Date().getTime()); - - // clear out old heartbeat info - node.setHeartbeat(null); - - // try to obtain a current flow - if (dataFlowManagementService.isFlowCurrent()) { - // if a cached copy does not exist, load it from disk - if (cachedDataFlow == null) { - final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow(); - cachedDataFlow = clusterDataFlow.getDataFlow(); - primaryNodeId = clusterDataFlow.getPrimaryNodeId(); - } - - // determine if this node should be assigned the primary role - final boolean primaryRole; - if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) { - setPrimaryNodeId(node.getNodeId()); - addEvent(node.getNodeId(), "Setting primary role in connection response."); - primaryRole = true; - } else { - primaryRole = false; - } - - return new ConnectionResponse(node.getNodeId(), cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId); - } - - /* - * The manager does not have a current copy of the data flow, - * so it will instruct the node to try connecting at a later - * time. Meanwhile, the flow will be locked down from user - * changes because the node is marked as connecting. - */ - - /* - * Create try-later response based on flow retrieval delay to give - * the flow management service a chance to retrieve a curren flow - */ - final int tryAgainSeconds; - if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) { - tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS; - } else { - tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds(); - } - - // record event - addEvent(node.getNodeId(), "Connection requested from node, but manager was unable to obtain current flow. Instructing node to try again in " + tryAgainSeconds + " seconds."); - - // return try later response - return new ConnectionResponse(tryAgainSeconds); - - } finally { - writeLock.unlock("requestConnection"); - } - } - - /** - * Services reconnection requests for a given node. If the node indicates - * reconnection failure, then the node will be set to disconnected and if - * the node has primary role, then the role will be revoked. Otherwise, a - * reconnection request will be sent to the node, initiating the connection - * handshake. - * - * @param nodeId a node identifier - * - * @throws UnknownNodeException if the node does not exist - * @throws IllegalNodeReconnectionException if the node cannot be - * reconnected because the node is not disconnected - * @throws NodeReconnectionException if the reconnection message failed to - * be sent or the cluster could not provide a current data flow for the - * reconnection request - */ - @Override - public void requestReconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeReconnectionException { - Node node = null; - - final boolean primaryRole; - final int tryAgainSeconds; - - writeLock.lock(); - try { - // check if we know about this node and that it is disconnected - node = getRawNode(nodeId); - logger.info("Request was made by {} to reconnect node {} to cluster", userDn, node == null ? nodeId : node); - - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } else if (Status.DISCONNECTED != node.getStatus()) { - throw new IllegalNodeReconnectionException("Node must be disconnected before it can reconnect."); - } - - // clear out old heartbeat info - node.setHeartbeat(null); - - // get the dataflow to send with the reconnection request - if (!dataFlowManagementService.isFlowCurrent()) { - /* node remains disconnected */ - final String msg = "Reconnection requested for node, but manager was unable to obtain current flow. Setting node to disconnected."; - addEvent(node.getNodeId(), msg); - addBulletin(node, Severity.WARNING, msg); - throw new NodeReconnectionException("Manager was unable to obtain current flow to provide in reconnection request to node. Try again in a few seconds."); - } - - // if a cached copy does not exist, load it from disk - if (cachedDataFlow == null) { - final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow(); - cachedDataFlow = clusterDataFlow.getDataFlow(); - primaryNodeId = clusterDataFlow.getPrimaryNodeId(); - } - - node.setStatus(Status.CONNECTING); - addEvent(node.getNodeId(), "Reconnection requested for node. Setting status to connecting."); - - // determine if this node should be assigned the primary role - if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) { - setPrimaryNodeId(node.getNodeId()); - addEvent(node.getNodeId(), "Setting primary role in reconnection request."); - primaryRole = true; - } else { - primaryRole = false; - } - - if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) { - tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS; - } else { - tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds(); - } - } catch (final UnknownNodeException | IllegalNodeReconnectionException | NodeReconnectionException une) { - throw une; - } catch (final Exception ex) { - logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + ex, ex); - - node.setStatus(Status.DISCONNECTED); - final String eventMsg = "Problem encountered issuing reconnection request. Node will remain disconnected: " + ex; - addEvent(node.getNodeId(), eventMsg); - addBulletin(node, Severity.WARNING, eventMsg); - - // Exception thrown will include node ID but event/bulletin do not because the node/id is passed along with the message - throw new NodeReconnectionException("Problem encountered issuing reconnection request to " + node.getNodeId() + ". Node will remain disconnected: " + ex, ex); - } finally { - writeLock.unlock("requestReconnection"); - } - - // Asynchronously start attempting reconnection. This is not completely thread-safe, as - // we do this by releasing the write lock and then obtaining a read lock for each attempt, - // so we suffer from the ABA problem. However, we are willing to accept the consequences of - // this situation in order to avoid holding a lock for the entire duration. "The consequences" - // are that a second thread could potentially be doing the same thing, issuing a reconnection request. - // However, this is very unlikely to happen, based on the conditions under which we issue a reconnection - // request. And if we do, the node will simply reconnect multiple times, which is not a big deal. - requestReconnectionAsynchronously(node, primaryRole, 10, tryAgainSeconds); - } - - private void requestReconnectionAsynchronously(final Node node, final boolean primaryRole, final int reconnectionAttempts, final int retrySeconds) { - final Thread reconnectionThread = new Thread(new Runnable() { - @Override - public void run() { - for (int i = 0; i < reconnectionAttempts; i++) { - final ReconnectionRequestMessage request = new ReconnectionRequestMessage(); - - try { - readLock.lock(); - try { - if (Status.CONNECTING != node.getStatus()) { - // the node status has changed. It's no longer appropriate to attempt reconnection. - return; - } - - // create the request - request.setNodeId(node.getNodeId()); - request.setDataFlow(cachedDataFlow); - request.setPrimary(primaryRole); - request.setManagerRemoteSiteCommsSecure(remoteCommsSecure); - request.setManagerRemoteSiteListeningPort(remoteInputPort); - request.setInstanceId(instanceId); - } finally { - readLock.unlock("Reconnect " + node.getNodeId()); - } - - // Issue a reconnection request to the node. - senderListener.requestReconnection(request); - - node.setConnectionRequestedTimestamp(System.currentTimeMillis()); - - // successfully told node to reconnect -- we're done! - return; - } catch (final Exception e) { - logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - - addBulletin(node, Severity.WARNING, "Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e); - } - - try { - Thread.sleep(1000L * retrySeconds); - } catch (final InterruptedException ie) { - break; - } - } - - // We failed to reconnect 10 times. We must now mark node as disconnected. - writeLock.lock(); - try { - if (Status.CONNECTING == node.getStatus()) { - requestDisconnectionQuietly(node.getNodeId(), "Failed to issue Reconnection Request " + reconnectionAttempts + " times"); - } - } finally { - writeLock.unlock("Mark node as Disconnected as a result of reconnection failure"); - } - } - }, "Reconnect " + node.getNodeId()); - - reconnectionThread.start(); - } - - private List<ReportingTaskNode> loadReportingTasks(final File taskConfigXml) { - final List<ReportingTaskNode> tasks = new ArrayList<>(); - if (taskConfigXml == null) { - logger.info("No controller tasks to start"); - return tasks; - } - - try { - final URL schemaUrl = getClass().getResource("/ReportingTaskConfiguration.xsd"); - final Document document = parse(taskConfigXml, schemaUrl); - - final NodeList tasksNodes = document.getElementsByTagName("tasks"); - final Element tasksElement = (Element) tasksNodes.item(0); - - //optional properties for all ReportingTasks - for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) { - //add global properties common to all tasks - Map<String, String> properties = new HashMap<>(); - - //get properties for the specific reporting task - id, name, class, - //and schedulingPeriod must be set - final String taskId = DomUtils.getChild(taskElement, "id").getTextContent().trim(); - final String taskName = DomUtils.getChild(taskElement, "name").getTextContent().trim(); - - final List<Element> schedulingStrategyNodeList = DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy"); - String schedulingStrategyValue = SchedulingStrategy.TIMER_DRIVEN.name(); - if (schedulingStrategyNodeList.size() == 1) { - final String specifiedValue = schedulingStrategyNodeList.get(0).getTextContent(); - - try { - schedulingStrategyValue = SchedulingStrategy.valueOf(specifiedValue).name(); - } catch (final Exception e) { - throw new RuntimeException("Cannot start Reporting Task with id " + taskId + " because its Scheduling Strategy does not have a valid value", e); - } - } - - final SchedulingStrategy schedulingStrategy = SchedulingStrategy.valueOf(schedulingStrategyValue); - final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim(); - final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim(); - - //optional task-specific properties - for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) { - final String name = optionalProperty.getAttribute("name"); - final String value = optionalProperty.getTextContent().trim(); - properties.put(name, value); - } - - //set the class to be used for the configured reporting task - final ReportingTaskNode reportingTaskNode; - try { - reportingTaskNode = createReportingTask(taskClass, taskId); - } catch (final ReportingTaskInstantiationException e) { - logger.error("Unable to load reporting task {} due to {}", new Object[]{taskId, e}); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - continue; - } - - final ReportingTask reportingTask = reportingTaskNode.getReportingTask(); - - final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName, schedulingStrategy, taskSchedulingPeriod, this); - reportingTask.initialize(config); - - final Map<PropertyDescriptor, String> resolvedProps; - try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { - resolvedProps = new HashMap<>(); - for (final Map.Entry<String, String> entry : properties.entrySet()) { - final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey()); - resolvedProps.put(descriptor, entry.getValue()); - } - } - - for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) { - reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue()); - } - - processScheduler.schedule(reportingTaskNode); - tasks.add(reportingTaskNode); - } - } catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) { - logger.error("Unable to load reporting tasks from {} due to {}", new Object[]{taskConfigXml, t}); - if (logger.isDebugEnabled()) { - logger.error("", t); - } - } - - return tasks; - } - - private ReportingTaskNode createReportingTask(final String type, final String id) throws ReportingTaskInstantiationException { - if (type == null) { - throw new NullPointerException(); - } - ReportingTask task = null; - final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); - try { - final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type); - final Class<?> rawClass; - if (detectedClassLoader == null) { - rawClass = Class.forName(type); - } else { - rawClass = Class.forName(type, false, detectedClassLoader); - } - - Thread.currentThread().setContextClassLoader(detectedClassLoader); - final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class); - final Object reportingTaskObj = reportingTaskClass.newInstance(); - task = reportingTaskClass.cast(reportingTaskObj); - } catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) { - throw new ReportingTaskInstantiationException(type, t); - } finally { - if (ctxClassLoader != null) { - Thread.currentThread().setContextClassLoader(ctxClassLoader); - } - } - - final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this); - final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler, - new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory); - return taskNode; - } - - private Document parse(final File xmlFile, final URL schemaUrl) throws SAXException, ParserConfigurationException, IOException { - final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); - final Schema schema = schemaFactory.newSchema(schemaUrl); - final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); - docFactory.setSchema(schema); - final DocumentBuilder builder = docFactory.newDocumentBuilder(); - - builder.setErrorHandler(new org.xml.sax.ErrorHandler() { - @Override - public void fatalError(final SAXParseException err) throws SAXException { - logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); - if (logger.isDebugEnabled()) { - logger.error("Error Stack Dump", err); - } - throw err; - } - - @Override - public void error(final SAXParseException err) throws SAXParseException { - logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage()); - if (logger.isDebugEnabled()) { - logger.error("Error Stack Dump", err); - } - throw err; - } - - @Override - public void warning(final SAXParseException err) throws SAXParseException { - logger.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage()); - if (logger.isDebugEnabled()) { - logger.warn("Warning stack dump", err); - } - throw err; - } - }); - - // build the docuemnt - final Document document = builder.parse(xmlFile); - - // ensure schema compliance - final Validator validator = schema.newValidator(); - validator.validate(new DOMSource(document)); - - return document; - } - - private void addBulletin(final Node node, final Severity severity, final String msg) { - addBulletin(node.getNodeId(), severity, msg); - } - - private void addBulletin(final NodeIdentifier nodeId, final Severity severity, final String msg) { - bulletinRepository.addBulletin(BulletinFactory.createBulletin(BULLETIN_CATEGORY, severity.toString(), - nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg)); - } - - /** - * Services a disconnection request. - * - * @param nodeId a node identifier - * @param userDn the DN of the user requesting the disconnection - * - * @throws UnknownNodeException if the node does not exist - * @throws IllegalNodeDisconnectionException if the node cannot be - * disconnected due to the cluster's state (e.g., node is last connected - * node or node is primary) - * @throws NodeDisconnectionException if the disconnection message fails to - * be sent. - */ - @Override - public void requestDisconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException { - writeLock.lock(); - try { - // check that the node is known - final Node node = getNode(nodeId); - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } - requestDisconnection(node.getNodeId(), /* ignore last node */ false, "User " + userDn + " Disconnected Node"); - } finally { - writeLock.unlock("requestDisconnection(String)"); - } - } - - /** - * Requests a disconnection to the node with the given node ID, but any - * exception thrown is suppressed. - * - * @param nodeId the node ID - */ - private void requestDisconnectionQuietly(final NodeIdentifier nodeId, final String explanation) { - try { - requestDisconnection(nodeId, /* ignore node check */ true, explanation); - } catch (final IllegalNodeDisconnectionException | NodeDisconnectionException ex) { /* suppress exception */ } - } - - /** - * Issues a disconnection message to the node identified by the given node - * ID. If the node is not known, then a UnknownNodeException is thrown. If - * the node cannot be disconnected due to the cluster's state and - * ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException is - * thrown. Otherwise, a disconnection message is issued to the node. - * - * Whether the disconnection message is successfully sent to the node, the - * node is marked as disconnected and if the node is the primary node, then - * the primary role is revoked. - * - * @param nodeId the ID of the node - * @param ignoreNodeChecks if false, checks will be made to ensure the - * cluster supports the node's disconnection (e.g., the node is not the last - * connected node in the cluster; the node is not the primary); otherwise, - * the request is made regardless of the cluster state - * @param explanation - * - * @throws IllegalNodeDisconnectionException if the node cannot be - * disconnected due to the cluster's state (e.g., node is last connected - * node or node is primary). Not thrown if ignoreNodeChecks is true. - * @throws NodeDisconnectionException if the disconnection message fails to - * be sent. - */ - private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation) - throws IllegalNodeDisconnectionException, NodeDisconnectionException { - - writeLock.lock(); - try { - - // check that the node is known - final Node node = getRawNode(nodeId.getId()); - if (node == null) { - if (ignoreNodeChecks) { - // issue the disconnection - final DisconnectMessage request = new DisconnectMessage(); - request.setNodeId(nodeId); - request.setExplanation(explanation); - - addEvent(nodeId, "Disconnection requested due to " + explanation); - senderListener.disconnect(request); - addEvent(nodeId, "Node disconnected due to " + explanation); - addBulletin(nodeId, Severity.INFO, "Node disconnected due to " + explanation); - return; - } else { - throw new UnknownNodeException("Node does not exist"); - } - } - - // if necessary, check that the node may be disconnected - if (!ignoreNodeChecks) { - final Set<NodeIdentifier> connectedNodes = getNodeIds(Status.CONNECTED); - // cannot disconnect the last connected node in the cluster - if (connectedNodes.size() == 1 && connectedNodes.iterator().next().equals(nodeId)) { - throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the only connected node in the cluster."); - } else if (isPrimaryNode(nodeId)) { - // cannot disconnect the primary node in the cluster - throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the primary node in the cluster."); - } - } - - // update status - node.setStatus(Status.DISCONNECTED); - notifyDataFlowManagementServiceOfNodeStatusChange(); - - // issue the disconnection - final DisconnectMessage request = new DisconnectMessage(); - request.setNodeId(nodeId); - request.setExplanation(explanation); - - addEvent(nodeId, "Disconnection requested due to " + explanation); - senderListener.disconnect(request); - addEvent(nodeId, "Node disconnected due to " + explanation); - addBulletin(node, Severity.INFO, "Node disconnected due to " + explanation); - } finally { - writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)"); - } - } - - /** - * Messages the node to have the primary role. If the messaging fails, then - * the node is marked as disconnected. - * - * @param nodeId the node ID to assign primary role - * - * @return true if primary role assigned; false otherwise - */ - private boolean assignPrimaryRole(final NodeIdentifier nodeId) { - writeLock.lock(); - try { - // create primary role message - final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage(); - msg.setNodeId(nodeId); - msg.setPrimary(true); - logger.info("Attempting to assign primary role to node: " + nodeId); - - // message - senderListener.assignPrimaryRole(msg); - - logger.info("Assigned primary role to node: " + nodeId); - addBulletin(nodeId, Severity.INFO, "Node assigned primary role"); - - // true indicates primary role assigned - return true; - - } catch (final ProtocolException ex) { - - logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex); - addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex); - - // mark node as disconnected and log/record the event - final Node node = getRawNode(nodeId.getId()); - node.setStatus(Status.DISCONNECTED); - addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role."); - - addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role"); - - // false indicates primary role failed to be assigned - return false; - } finally { - writeLock.unlock("assignPrimaryRole"); - } - } - - /** - * Messages the node with the given node ID to no longer have the primary - * role. If the messaging fails, then the node is marked as disconnected. - * - * @return true if the primary role was revoked from the node; false - * otherwise - */ - private boolean revokePrimaryRole(final NodeIdentifier nodeId) { - writeLock.lock(); - try { - // create primary role message - final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage(); - msg.setNodeId(nodeId); - msg.setPrimary(false); - logger.info("Attempting to revoke primary role from node: " + nodeId); - - // send message - senderListener.assignPrimaryRole(msg); - - logger.info("Revoked primary role from node: " + nodeId); - addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node"); - - // true indicates primary role was revoked - return true; - } catch (final ProtocolException ex) { - - logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex); - - // mark node as disconnected and log/record the event - final Node node = getRawNode(nodeId.getId()); - node.setStatus(Status.DISCONNECTED); - addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role."); - addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role"); - - // false indicates primary role failed to be revoked - return false; - } finally { - writeLock.unlock("revokePrimaryRole"); - } - } - - private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { - return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), - nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), dn); - } - - private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) { - final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier(); - final ConnectionRequest requestWithDn = new ConnectionRequest(addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN())); - - final ConnectionResponse response = requestConnection(requestWithDn); - final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage(); - responseMessage.setConnectionResponse(response); - return responseMessage; - } - - private void handleControllerStartupFailure(final ControllerStartupFailureMessage msg) { - writeLock.lock(); - try { - final Node node = getRawNode(msg.getNodeId().getId()); - if (node != null) { - node.setStatus(Status.DISCONNECTED); - addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage()); - addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage()); - } - } finally { - writeLock.unlock("handleControllerStartupFailure"); - } - } - - private void handleReconnectionFailure(final ReconnectionFailureMessage msg) { - writeLock.lock(); - try { - final Node node = getRawNode(msg.getNodeId().getId()); - if (node != null) { - node.setStatus(Status.DISCONNECTED); - final String errorMsg = "Node could not rejoin cluster. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage(); - addEvent(msg.getNodeId(), errorMsg); - addBulletin(node, Severity.ERROR, errorMsg); - } - } finally { - writeLock.unlock("handleControllerStartupFailure"); - } - } - - - @Override - public ControllerService getControllerService(String serviceIdentifier) { - return controllerServiceProvider.getControllerService(serviceIdentifier); - } - - @Override - public ControllerServiceNode getControllerServiceNode(final String id) { - return controllerServiceProvider.getControllerServiceNode(id); - } - - @Override - public boolean isControllerServiceEnabled(final ControllerService service) { - return controllerServiceProvider.isControllerServiceEnabled(service); - } - - @Override - public boolean isControllerServiceEnabled(final String serviceIdentifier) { - return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier); - } - - @Override - public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { - return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); - } - - @Override - public void removeControllerService(final ControllerServiceNode serviceNode) { - controllerServiceProvider.removeControllerService(serviceNode); - } - - - @Override - public void enableControllerService(final ControllerServiceNode serviceNode) { - controllerServiceProvider.enableControllerService(serviceNode); - } - - @Override - public void disableControllerService(final ControllerServiceNode serviceNode) { - controllerServiceProvider.disableControllerService(serviceNode); - } - - - /** - * Handle a bulletins message. - * - * @param bulletins - */ - public void handleBulletins(final NodeBulletins bulletins) { - final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier(); - final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); - - // unmarshal the message - BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload()); - for (final Bulletin bulletin : payload.getBulletins()) { - bulletin.setNodeAddress(nodeAddress); - bulletinRepository.addBulletin(bulletin); - } - } - - /** - * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat - * since its connection request, then the manager will mark the node as - * connected. If the node was previously disconnected due to a lack of - * heartbeat, then a reconnection request is issued. If the node was - * disconnected for other reasons, then a disconnection request is issued. - * If this instance is configured with a firewall and the heartbeat is - * blocked, then a disconnection request is issued. - * - * @param heartbeat - */ - @Override - public void handleHeartbeat(final Heartbeat heartbeat) { - // sanity check heartbeat - if (heartbeat == null) { - throw new IllegalArgumentException("Heartbeat may not be null."); - } else if (heartbeat.getNodeIdentifier() == null) { - throw new IllegalArgumentException("Heartbeat does not contain a node ID."); - } - - /* - * Processing a heartbeat requires a write lock, which may take a while - * to obtain. Only the last heartbeat is necessary to process per node. - * Futhermore, since many could pile up, heartbeats are processed in - * bulk. - * - * The below queue stores the pending heartbeats. - */ - pendingHeartbeats.add(heartbeat); - } - - private void processPendingHeartbeats() { - Node node; - - writeLock.lock(); - try { - /* - * Get the most recent heartbeats for the nodes in the cluster. This - * is achieved by "draining" the pending heartbeats queue, populating - * a map that associates a node identifier with its latest heartbeat, and - * finally, getting the values of the map. - */ - final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new HashMap<>(); - Heartbeat aHeartbeat; - while ((aHeartbeat = pendingHeartbeats.poll()) != null) { - mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), aHeartbeat); - } - final Collection<Heartbeat> mostRecentHeartbeats = new ArrayList<>(mostRecentHeartbeatsMap.values()); - - // return fast if no work to do - if (mostRecentHeartbeats.isEmpty()) { - return; - } - - logNodes("Before Heartbeat Processing", heartbeatLogger); - - final int numPendingHeartbeats = mostRecentHeartbeats.size(); - if (heartbeatLogger.isDebugEnabled()) { - heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : "")); - } - - for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) { - try { - // resolve the proposed node identifier to valid node identifier - final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier()); - - // get a raw reference to the node (if it doesn't exist, node will be null) - node = getRawNode(resolvedNodeIdentifier.getId()); - - // if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role - if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) { - addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node. Revoking primary role because primary role is assigned to a different node."); - revokePrimaryRole(resolvedNodeIdentifier); - } - - final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected(); - - if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) { - if (node == null) { - logger.info("Firewall blocked heartbeat received from unknown node " + resolvedNodeIdentifier + ". Issuing disconnection request."); - } else { - // record event - addEvent(resolvedNodeIdentifier, "Firewall blocked received heartbeat. Issuing disconnection request."); - } - - // request node to disconnect - requestDisconnectionQuietly(resolvedNodeIdentifier, "Blocked By Firewall"); - - } else if (node == null) { // unknown node, so issue reconnect request - // create new node and add to node set - final Node newNode = new Node(resolvedNodeIdentifier, Status.DISCONNECTED); - nodes.add(newNode); - - // record event - addEvent(newNode.getNodeId(), "Received heartbeat from unknown node. Issuing reconnection request."); - - // record heartbeat - newNode.setHeartbeat(mostRecentHeartbeat); - requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing"); - } else if (heartbeatIndicatesNotYetConnected) { - if (Status.CONNECTED == node.getStatus()) { - // record event - addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it was. Marking as Disconnected and issuing reconnection request."); - - // record heartbeat - node.setHeartbeat(null); - node.setStatus(Status.DISCONNECTED); - - requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing"); - } - } else if (Status.DISCONNECTED == node.getStatus()) { - // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is - // the only node. We allow it if it is the only node because if we have a one-node cluster, then - // we cannot manually reconnect it. - if (node.isHeartbeatDisconnection() || nodes.size() == 1) { - // record event - if (node.isHeartbeatDisconnection()) { - addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected due to lack of heartbeat. Issuing reconnection request."); - } else { - addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected, but it is the only known node, so issuing reconnection request."); - } - - // record heartbeat - node.setHeartbeat(mostRecentHeartbeat); - - // request reconnection - requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing"); - } else { - // disconnected nodes should not heartbeat, so we need to issue a disconnection request - heartbeatLogger.info("Ignoring received heartbeat from disconnected node " + resolvedNodeIdentifier + ". Issuing disconnection request."); - - // request node to disconnect - requestDisconnectionQuietly(resolvedNodeIdentifier, "Received Heartbeat from Node, but Manager has already marked Node as Disconnected"); - } - - } else if (Status.DISCONNECTING == node.getStatus()) { - /* ignore spurious heartbeat */ - } else { // node is either either connected or connecting - // first heartbeat causes status change from connecting to connected - if (Status.CONNECTING == node.getStatus()) { - if (mostRecentHeartbeat.getCreatedTimestamp() < node.getConnectionRequestedTimestamp()) { - heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + " but ignoring because it was generated before the node was last asked to reconnect."); - continue; - } - - // set status to connected - node.setStatus(Status.CONNECTED); - - // record event - addEvent(resolvedNodeIdentifier, "Received first heartbeat from connecting node. Setting node to connected."); - - // notify service of updated node set - notifyDataFlowManagementServiceOfNodeStatusChange(); - - addBulletin(node, Severity.INFO, "Node Connected"); - } else { - heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + "."); - } - - // record heartbeat - node.setHeartbeat(mostRecentHeartbeat); - - ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - statusRepository = createComponentStatusRepository(); - componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository); - } - - // If it's been a while since we've captured, capture this metric. - final Date lastCaptureDate = statusRepository.getLastCaptureDate(); - final long millisSinceLastCapture = (lastCaptureDate == null) ? Long.MAX_VALUE : (System.currentTimeMillis() - lastCaptureDate.getTime()); - - if (millisSinceLastCapture > componentStatusSnapshotMillis) { - statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus()); - } - } - } catch (final Exception e) { - logger.error("Failed to process heartbeat from {}:{} due to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } - } - - logNodes("After Heartbeat Processing", heartbeatLogger); - } finally { - writeLock.unlock("processPendingHeartbeats"); - } - } - - private ComponentStatusRepository createComponentStatusRepository() { - final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); - if (implementationClassName == null) { - throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: " - + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION); - } - - try { - return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public Set<Node> getNodes(final Status... statuses) { - final Set<Status> desiredStatusSet = new HashSet<>(); - for (final Status status : statuses) { - desiredStatusSet.add(status); - } - - readLock.lock(); - try { - final Set<Node> clonedNodes = new HashSet<>(); - for (final Node node : nodes) { - if (desiredStatusSet.isEmpty() || desiredStatusSet.contains(node.getStatus())) { - clonedNodes.add(node.clone()); - } - } - return Collections.unmodifiableSet(clonedNodes); - } finally { - readLock.unlock("getNodes(Status...)"); - } - } - - @Override - public Node getNode(final String nodeId) { - readLock.lock(); - try { - for (final Node node : nodes) { - if (node.getNodeId().getId().equals(nodeId)) { - return node.clone(); - } - } - return null; - } finally { - readLock.unlock("getNode(String)"); - } - } - - @Override - public Node getPrimaryNode() { - readLock.lock(); - try { - if (primaryNodeId == null) { - return null; - } else { - return getNode(primaryNodeId.getId()); - } - } finally { - readLock.unlock("getPrimaryNode"); - } - } - - @Override - public void deleteNode(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDeletionException { - writeLock.lock(); - try { - final Node node = getNode(nodeId); - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } else if (Status.DISCONNECTED == node.getStatus()) { - nodes.remove(node); - - if (eventManager != null) { - eventManager.clearEventHistory(node.getNodeId().getId()); - } - - logger.info("Removing node {} from cluster because this action was requested by {}", node, userDn); - } else { - throw new IllegalNodeDeletionException("Node may not be deleted because it is not disconnected."); - } - } finally { - writeLock.unlock("deleteNode"); - } - } - - @Override - public Set<NodeIdentifier> getNodeIds(final Status... statuses) { - readLock.lock(); - try { - final Set<NodeIdentifier> nodeIds = new HashSet<>(); - for (final Node node : nodes) { - if (statuses == null || statuses.length == 0) { - nodeIds.add(node.getNodeId()); - } else { - for (final Node.Status status : statuses) { - if (node.getStatus() == status) { - nodeIds.add(node.getNodeId()); - break; - } - } - } - } - return nodeIds; - } finally { - readLock.unlock("getNodeIds(Status...)"); - } - } - - @Override - public void setPrimaryNode(final String nodeId, final String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException { - writeLock.lock(); - try { - - final Node node = getNode(nodeId); - if (node == null) { - throw new UnknownNodeException("Node does not exist."); - } else if (Status.CONNECTED != node.getStatus()) { - throw new IneligiblePrimaryNodeException("Node must be connected before it can be assigned as the primary node."); - } - - // revoke primary role - final Node primaryNode; - if ((primaryNode = getPrimaryNode()) != null) { - if (primaryNode.getStatus() == Status.DISCONNECTED) { - throw new PrimaryRoleAssignmentException("A disconnected, primary node exists. Delete the node before assigning the primary role to a different node."); - } else if (revokePrimaryRole(primaryNode.getNodeId())) { - addEvent(primaryNode.getNodeId(), "Role revoked from this node as part of primary role reassignment."); - } else { - throw new PrimaryRoleAssignmentException( - "Failed to revoke primary role from node. Primary node is now disconnected. Delete the node before assigning the primary role to a different node."); - } - } - - // change the primary node ID to the given node - setPrimaryNodeId(node.getNodeId()); - - // assign primary role - if (assignPrimaryRole(node.getNodeId())) { - addEvent(node.getNodeId(), "Role assigned to this node as part of primary role reassignment. Action performed by " + userDn); - addBulletin(node, Severity.INFO, "Primary Role assigned to node by " + userDn); - } else { - throw new PrimaryRoleAssignmentException( - "Cluster manager assigned primary role to node, but the node failed to accept the assignment. Cluster manager disconnected node."); - } - } finally { - writeLock.unlock("setPrimaryNode"); - } - } - - private int getClusterProtocolHeartbeatSeconds() { - return (int) FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), TimeUnit.SECONDS); - } - - @Override - public int getHeartbeatMonitoringIntervalSeconds() { - return 4 * getClusterProtocolHeartbeatSeconds(); - } - - @Override - public int getMaxHeartbeatGapSeconds() { - return 8 * getClusterProtocolHeartbeatSeconds(); - } - - @Override - public List<Event> getNodeEvents(final String nodeId) { - readLock.lock(); - try { - List<Event> events = null; - final EventManager eventMgr = eventManager; - if (eventMgr != null) { - events = eventMgr.getEvents(nodeId); - } - - if (events == null) { - return Collections.emptyList(); - } else { - return Collections.unmodifiableList(events); - } - } finally { - readLock.unlock("getNodeEvents"); - } - } - - @Override - public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers) - throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException { - return applyRequest(method, uri, parameters, headers, getNodeIds(Status.CONNECTED)); - } - - @Override - public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers, final Set<NodeIdentifier> nodeIdentifiers) - throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException { - - final boolean mutableRequest = canChangeNodeState(method, uri); - final ClusterManagerLock lock = mutableRequest ? writeLock : readLock; - - lock.lock(); - try { - // check that the request can be applied - if (mutableRequest) { - if (isInSafeMode()) { - throw new SafeModeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while in safe mode"); - } else if (!getNodeIds(Status.DISCONNECTED, Status.DISCONNECTING).isEmpty()) { - throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is disconnected from the cluster"); - } else if (!getNodeIds(Status.CONNECTING).isEmpty()) { - // if any node is connecting and a request can change the flow, then we throw an exception - throw new ConnectingNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is trying to connect to the cluster"); - } - } - - final NodeResponse clientResponse = federateRequest(method, uri, parameters, null, headers, nodeIdentifiers); - if (clientResponse == null) { - if (mutableRequest) { - throw new NoConnectedNodesException(String.format("All nodes were disconnected as a result of applying request %s %s", method, uri)); - } else { - throw new NoResponseFromNodesException("No nodes were able to process this request."); - } - } else { - return clientResponse; - } - } finally { - lock.unlock("applyRequest(String, URI, Map<String, List<String>>, Map<String
<TRUNCATED>