http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
deleted file mode 100644
index d459b00..0000000
---
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import org.apache.nifi.cluster.ConnectionException;
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.DataFlow;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
-import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.util.file.FileUtils;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.lifecycle.LifeCycleStartException;
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.persistence.FlowConfigurationDAO;
-import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.services.FlowService;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StandardFlowService implements FlowService, ProtocolHandler {
-
- private static final String EVENT_CATEGORY = "Controller";
-
- private final FlowController controller;
- private final Path flowXml;
- private final Path taskConfigXml;
- private final Path serviceConfigXml;
- private final FlowConfigurationDAO dao;
- private final int gracefulShutdownSeconds;
- private final boolean autoResumeState;
- private final int connectionRetryMillis;
- private final StringEncryptor encryptor;
-
- // Lock is used to protect the flow.xml file.
- private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
-
- private final AtomicBoolean running = new AtomicBoolean(false);
- private final AtomicReference<ScheduledExecutorService> executor = new
AtomicReference<>(null);
- private final AtomicReference<SaveHolder> saveHolder = new
AtomicReference<>(null);
-
- /**
- * listener/sender for internal cluster communication
- */
- private final NodeProtocolSenderListener senderListener;
-
- /**
- * flag indicating whether we are operating in a clustered environment
- */
- private final boolean configuredForClustering;
-
- /**
- * the node identifier
- */
- private NodeIdentifier nodeId;
-
- // guardedBy rwLock
- private boolean firstControllerInitialization = true;
-
- private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to
connect node to cluster because ";
- private static final Logger logger =
LoggerFactory.getLogger(StandardFlowService.class);
-
- public static StandardFlowService createStandaloneInstance(
- final FlowController controller,
- final NiFiProperties properties,
- final StringEncryptor encryptor) throws IOException {
- return new StandardFlowService(
- controller,
- properties,
- /* nodeProtocolSenderListener */ null,
- encryptor,
- /* configuredForClustering */ false);
- }
-
- public static StandardFlowService createClusteredInstance(
- final FlowController controller,
- final NiFiProperties properties,
- final NodeProtocolSenderListener senderListener,
- final StringEncryptor encryptor) throws IOException {
- return new StandardFlowService(
- controller,
- properties,
- senderListener,
- encryptor,
- /* configuredForClustering */ true);
- }
-
- private StandardFlowService(
- final FlowController controller,
- final NiFiProperties properties,
- final NodeProtocolSenderListener senderListener,
- final StringEncryptor encryptor,
- final boolean configuredForClustering) throws IOException {
-
- this.controller = controller;
- this.encryptor = encryptor;
- flowXml =
Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE));
- taskConfigXml =
Paths.get(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
- serviceConfigXml =
Paths.get(properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE));
-
- gracefulShutdownSeconds = (int)
FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD),
TimeUnit.SECONDS);
- autoResumeState = properties.getAutoResumeState();
- connectionRetryMillis = (int)
FormatUtils.getTimeDuration(properties.getClusterManagerFlowRetrievalDelay(),
TimeUnit.MILLISECONDS);
-
- dao = new StandardXMLFlowConfigurationDAO(flowXml, taskConfigXml,
serviceConfigXml, encryptor);
-
- if (configuredForClustering) {
-
- this.configuredForClustering = configuredForClustering;
-
- this.senderListener = senderListener;
- senderListener.addHandler(this);
-
- final InetSocketAddress nodeApiAddress =
properties.getNodeApiAddress();
- final InetSocketAddress nodeSocketAddress =
properties.getClusterNodeProtocolAddress();
-
- // use a random UUID as the proposed node identifier
- this.nodeId = new NodeIdentifier(UUID.randomUUID().toString(),
nodeApiAddress.getHostName(), nodeApiAddress.getPort(),
nodeSocketAddress.getHostName(), nodeSocketAddress.getPort());
-
- } else {
- this.configuredForClustering = false;
- this.senderListener = null;
- }
-
- }
-
- @Override
- public void saveFlowChanges() throws IOException {
- writeLock.lock();
- try {
- dao.save(controller);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void saveFlowChanges(final OutputStream outStream) throws
IOException {
- writeLock.lock();
- try {
- dao.save(controller, outStream);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void overwriteFlow(final InputStream is) throws IOException {
- writeLock.lock();
- try (final OutputStream output = Files.newOutputStream(flowXml,
StandardOpenOption.WRITE, StandardOpenOption.CREATE);
- final OutputStream gzipOut = new GZIPOutputStream(output);) {
- FileUtils.copy(is, gzipOut);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void saveFlowChanges(final TimeUnit delayUnit, final long delay) {
- saveFlowChanges(delayUnit, delay, false);
- }
-
- @Override
- public void saveFlowChanges(final TimeUnit delayUnit, final long delay,
final boolean archive) {
- final Calendar saveTime = Calendar.getInstance();
- final long delayInMs = TimeUnit.MILLISECONDS.convert(delay, delayUnit);
- int finalDelayMs = 500; //default to 500 ms.
- if (delayInMs <= Integer.MAX_VALUE) {
- finalDelayMs = (int) delayInMs;
- }
- saveTime.add(Calendar.MILLISECOND, finalDelayMs);
-
- if (logger.isTraceEnabled()) {
- logger.trace(" A request to save the flow has been made with delay
{} for time {}", finalDelayMs, saveTime.getTime());
- }
-
- saveHolder.set(new SaveHolder(saveTime, archive));
- }
-
- @Override
- public boolean isRunning() {
- return running.get();
- }
-
- @Override
- public void start() throws LifeCycleStartException {
- writeLock.lock();
- try {
-
- if (isRunning()) {
- return;
- }
-
- running.set(true);
-
- final ScheduledExecutorService newExecutor = new FlowEngine(2,
"Flow Service Tasks");
- newExecutor.scheduleWithFixedDelay(new SaveReportingTask(), 0L,
500L, TimeUnit.MILLISECONDS);
- this.executor.set(newExecutor);
-
- if (configuredForClustering) {
- senderListener.start();
- }
-
- } catch (final IOException ioe) {
- try {
- stop(/* force */true);
- } catch (final Exception e) {
- }
-
- throw new LifeCycleStartException("Failed to start Flow Service
due to: " + ioe, ioe);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void stop(final boolean force) {
- writeLock.lock();
- try {
-
- if (!isRunning()) {
- return;
- }
-
- running.set(false);
-
- if (!controller.isTerminated()) {
- controller.shutdown(force);
- }
-
- if (configuredForClustering && senderListener != null) {
- try {
- senderListener.stop();
- } catch (final IOException ioe) {
- logger.warn("Protocol sender/listener did not stop
gracefully due to: " + ioe);
- }
- }
-
- final ScheduledExecutorService executorService = executor.get();
- if (executorService != null) {
- if (force) {
- executorService.shutdownNow();
- } else {
- executorService.shutdown();
- }
-
- boolean graceful;
- try {
- graceful =
executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS);
- } catch (final InterruptedException e) {
- graceful = false;
- }
-
- if (!graceful) {
- logger.warn("Scheduling service did not gracefully
shutdown within configured " + gracefulShutdownSeconds + " second window");
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public boolean canHandle(final ProtocolMessage msg) {
- switch (msg.getType()) {
- case RECONNECTION_REQUEST:
- case DISCONNECTION_REQUEST:
- case FLOW_REQUEST:
- case PRIMARY_ROLE:
- return true;
- default:
- return false;
- }
- }
-
- @Override
- public ProtocolMessage handle(final ProtocolMessage request) throws
ProtocolException {
- final long startNanos = System.nanoTime();
- try {
- switch (request.getType()) {
- case FLOW_REQUEST:
- return handleFlowRequest((FlowRequestMessage) request);
- case RECONNECTION_REQUEST:
- // Suspend heartbeats until we've reconnected. Otherwise,
- // we may send a heartbeat while we are still in the
process of
- // connecting, which will cause the Cluster Manager to
mark us
- // as "Connected," which becomes problematic as the
FlowController's lock
- // may still be held, causing this node to take a long
time to respond to requests.
- controller.suspendHeartbeats();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
-
handleReconnectionRequest((ReconnectionRequestMessage) request);
- }
- }, "Reconnect to Cluster").start();
-
- return new ReconnectionResponseMessage();
- case DISCONNECTION_REQUEST:
- new Thread(new Runnable() {
- @Override
- public void run() {
- handleDisconnectionRequest((DisconnectMessage)
request);
- }
- }, "Disconnect from Cluster").start();
-
- return null;
- case PRIMARY_ROLE:
- new Thread(new Runnable() {
- @Override
- public void run() {
-
handlePrimaryRoleAssignment((PrimaryRoleAssignmentMessage) request);
- }
- }, "Set Primary Role Status").start();
- return null;
- default:
- throw new ProtocolException("Handler cannot handle message
type: " + request.getType());
- }
- } finally {
- if (logger.isDebugEnabled()) {
- final long procNanos = System.nanoTime() - startNanos;
- final long procMillis =
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
- logger.debug("Finished Processing Protocol Message of type {}
in {} millis", request.getType(), procMillis);
- }
- }
- }
-
- @Override
- public void load(final DataFlow proposedFlow) throws IOException,
FlowSerializationException,
- FlowSynchronizationException, UninheritableFlowException {
- writeLock.lock();
- try {
- if (configuredForClustering) {
- /*
- * Attempt to connect to the cluster. If the manager is able
to
- * provide a data flow, then the manager will send a connection
- * response. If the manager was unable to be located, then
- * the response will be null and we should load the local
dataflow
- * and heartbeat until a manager is located.
- */
- final boolean localFlowEmpty =
StandardFlowSynchronizer.isEmpty(proposedFlow, encryptor);
- final ConnectionResponse response = connect(localFlowEmpty,
localFlowEmpty);
-
- if (response == null) {
- logger.info("Flow controller will load local dataflow and
suspend connection handshake until a cluster connection response is received.");
-
- // load local proposed flow
- loadFromBytes(proposedFlow, false);
-
- // set node ID on controller before we start heartbeating
because heartbeat needs node ID
- controller.setNodeId(nodeId);
-
- // set node as clustered, since it is trying to connect to
a cluster
- controller.setClustered(true, null);
- controller.setClusterManagerRemoteSiteInfo(null, null);
- controller.setConnected(false);
-
- /*
- * Start heartbeating. Heartbeats will fail because we
can't reach
- * the manager, but when we locate the manager, the node
will
- * reconnect and establish a connection to the cluster.
The
- * heartbeat is the trigger that will cause the manager to
- * issue a reconnect request.
- */
- controller.startHeartbeating();
-
- // if configured, start all components
- if (autoResumeState) {
- try {
- controller.startDelayed();
- } catch (final Exception ex) {
- logger.warn("Unable to start all processors due to
invalid flow configuration.");
- if (logger.isDebugEnabled()) {
- logger.warn(StringUtils.EMPTY, ex);
- }
- }
- }
-
- } else {
- try {
- loadFromConnectionResponse(response);
- } catch (final ConnectionException ce) {
- logger.error("Failed to load flow from cluster due to:
" + ce, ce);
-
- /*
- * If we failed processing the response, then we want
to notify
- * the manager so that it can mark the node as
disconnected.
- */
- // create error message
- final ControllerStartupFailureMessage msg = new
ControllerStartupFailureMessage();
- msg.setExceptionMessage(ce.getMessage());
- msg.setNodeId(response.getNodeIdentifier());
-
- // send error message to manager
- try {
- senderListener.notifyControllerStartupFailure(msg);
- } catch (final ProtocolException |
UnknownServiceAddressException e) {
- logger.warn("Failed to notify cluster manager of
controller startup failure due to: " + e, e);
- }
-
- throw new IOException(ce);
- }
- }
- } else {
- // operating in standalone mode, so load proposed flow
- loadFromBytes(proposedFlow, true);
- }
-
- } finally {
- writeLock.unlock();
- }
- }
-
- private FlowResponseMessage handleFlowRequest(final FlowRequestMessage
request) throws ProtocolException {
- readLock.lock();
- try {
- logger.info("Received flow request message from manager.");
-
- // serialize the flow to the output stream
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- copyCurrentFlow(baos);
- final byte[] flowBytes = baos.toByteArray();
- baos.reset();
-
- final byte[] templateBytes =
controller.getTemplateManager().export();
- final byte[] snippetBytes =
controller.getSnippetManager().export();
-
- // create the response
- final FlowResponseMessage response = new FlowResponseMessage();
-
- response.setDataFlow(new StandardDataFlow(flowBytes,
templateBytes, snippetBytes));
-
- return response;
-
- } catch (final Exception ex) {
- throw new ProtocolException("Failed serializing flow controller
state for flow request due to: " + ex, ex);
- } finally {
- readLock.unlock();
- }
- }
-
- private void handlePrimaryRoleAssignment(final
PrimaryRoleAssignmentMessage msg) {
- writeLock.lock();
- try {
- controller.setPrimary(msg.isPrimary());
- } finally {
- writeLock.unlock();
- }
- }
-
- private void handleReconnectionRequest(final ReconnectionRequestMessage
request) {
- writeLock.lock();
- try {
- logger.info("Processing reconnection request from manager.");
-
- // reconnect
- final ConnectionResponse connectionResponse = new
ConnectionResponse(nodeId, request.getDataFlow(), request.isPrimary(),
- request.getManagerRemoteSiteListeningPort(),
request.isManagerRemoteSiteCommsSecure(), request.getInstanceId());
- connectionResponse.setClusterManagerDN(request.getRequestorDN());
- loadFromConnectionResponse(connectionResponse);
-
- controller.resumeHeartbeats(); // we are now connected, so resume
sending heartbeats.
-
- logger.info("Node reconnected.");
- } catch (final Exception ex) {
- // disconnect controller
- if (controller.isClustered()) {
- disconnect();
- }
-
- logger.error("Handling reconnection request failed due to: " + ex,
ex);
-
- final ReconnectionFailureMessage failureMessage = new
ReconnectionFailureMessage();
- failureMessage.setNodeId(request.getNodeId());
- failureMessage.setExceptionMessage(ex.toString());
-
- // send error message to manager
- try {
- senderListener.notifyReconnectionFailure(failureMessage);
- } catch (final ProtocolException | UnknownServiceAddressException
e) {
- logger.warn("Failed to notify cluster manager of controller
reconnection failure due to: " + e, e);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- private void handleDisconnectionRequest(final DisconnectMessage request) {
- writeLock.lock();
- try {
- logger.info("Received disconnection request message from manager
with explanation: " + request.getExplanation());
- disconnect();
- } finally {
- writeLock.unlock();
- }
- }
-
- private void disconnect() {
- writeLock.lock();
- try {
-
- logger.info("Disconnecting node.");
-
- // mark node as not connected
- controller.setConnected(false);
-
- // turn off primary flag
- controller.setPrimary(false);
-
- // stop heartbeating
- controller.stopHeartbeating();
-
- // set node to not clustered
- controller.setClustered(false, null);
-
- logger.info("Node disconnected.");
-
- } finally {
- writeLock.unlock();
- }
- }
-
- // write lock must already be acquired
- private void loadFromBytes(final DataFlow proposedFlow, final boolean
allowEmptyFlow)
- throws IOException, FlowSerializationException,
FlowSynchronizationException, UninheritableFlowException {
- logger.trace("Loading flow from bytes");
- final TemplateManager templateManager =
controller.getTemplateManager();
- templateManager.loadTemplates();
- logger.trace("Finished loading templates");
-
- // resolve the given flow (null means load flow from disk)
- final DataFlow actualProposedFlow;
- final byte[] flowBytes;
- final byte[] templateBytes;
- if (proposedFlow == null) {
- final ByteArrayOutputStream flowOnDisk = new
ByteArrayOutputStream();
- copyCurrentFlow(flowOnDisk);
- flowBytes = flowOnDisk.toByteArray();
- templateBytes = templateManager.export();
- logger.debug("Loaded Flow from bytes");
- } else {
- flowBytes = proposedFlow.getFlow();
- templateBytes = proposedFlow.getTemplates();
- logger.debug("Loaded flow from proposed flow");
- }
-
- actualProposedFlow = new StandardDataFlow(flowBytes, templateBytes,
null);
-
- if (firstControllerInitialization) {
- // load the controller services
- logger.debug("Loading controller services");
- dao.loadControllerServices(controller);
- }
-
- // load the flow
- logger.debug("Loading proposed flow into FlowController");
- dao.load(controller, actualProposedFlow);
-
- final ProcessGroup rootGroup =
controller.getGroup(controller.getRootGroupId());
- if (rootGroup.isEmpty() && !allowEmptyFlow) {
- throw new FlowSynchronizationException("Failed to load flow
because unable to connect to cluster and local flow is empty");
- }
-
- // lazy initialization of controller tasks and flow
- if (firstControllerInitialization) {
- logger.debug("First controller initialization. Loading reporting
tasks and initializing controller.");
-
- // load the controller tasks
- dao.loadReportingTasks(controller);
-
- // initialize the flow
- controller.initializeFlow();
-
- firstControllerInitialization = false;
- }
- }
-
- private ConnectionResponse connect(final boolean retryOnCommsFailure,
final boolean retryIndefinitely) throws ConnectionException {
- writeLock.lock();
- try {
- logger.info("Connecting Node: " + nodeId);
-
- // create connection request message
- final ConnectionRequest request = new ConnectionRequest(nodeId);
- final ConnectionRequestMessage requestMsg = new
ConnectionRequestMessage();
- requestMsg.setConnectionRequest(request);
-
- // send connection request to cluster manager
- /*
- * Try to get a current copy of the cluster's dataflow from the
manager
- * for ten times, sleeping between attempts. Ten times should be
- * enough because the manager will register the node as connecting
- * and therefore, no other changes to the cluster flow can occur.
- *
- * However, the manager needs to obtain a current data flow within
- * maxAttempts * tryLaterSeconds or else the node will fail to
startup.
- */
- final int maxAttempts = 10;
- ConnectionResponse response = null;
- for (int i = 0; i < maxAttempts || retryIndefinitely; i++) {
- try {
- response =
senderListener.requestConnection(requestMsg).getConnectionResponse();
- if (response.isBlockedByFirewall()) {
- logger.warn("Connection request was blocked by cluster
manager's firewall.");
- // set response to null and treat a firewall blockage
the same as getting no response from manager
- response = null;
- break;
- } else if (response.shouldTryLater()) {
- logger.info("Flow controller requested by cluster
manager to retry connection in " + response.getTryLaterSeconds() + " seconds.");
- try {
- Thread.sleep(response.getTryLaterSeconds() * 1000);
- } catch (final InterruptedException ie) {
- // we were interrupted, so finish quickly
- break;
- }
- } else {
- // we received a successful connection response from
manager
- break;
- }
-
- } catch (final Exception pe) {
- // could not create a socket and communicate with manager
- logger.warn("Failed to connect to cluster due to: " + pe,
pe);
- if (retryOnCommsFailure) {
- try {
- Thread.sleep(connectionRetryMillis);
- } catch (final InterruptedException ie) {
- break;
- }
- } else {
- break;
- }
- }
- }
-
- if (response == null) {
- // if response is null, then either we had IO problems or we
were blocked by firewall or we couldn't determine manager's address
- return response;
- } else if (response.shouldTryLater()) {
- // if response indicates we should try later, then manager was
unable to service our request. Just load local flow and move on.
- return null;
- } else {
- // cluster manager provided a successful response with a
current dataflow
- return response;
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- private void loadFromConnectionResponse(final ConnectionResponse response)
throws ConnectionException {
- writeLock.lock();
- try {
-
- // get the dataflow from the response
- final DataFlow dataFlow = response.getDataFlow();
-
- // load new controller state
- loadFromBytes(dataFlow, true);
-
- // set node ID on controller before we start heartbeating because
heartbeat needs node ID
- nodeId = response.getNodeIdentifier();
- logger.info("Setting Flow Controller's Node ID: " + nodeId);
- controller.setNodeId(nodeId);
-
- // mark the node as clustered
- controller.setClustered(true, response.getInstanceId(),
response.getClusterManagerDN());
-
controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(),
response.isManagerRemoteCommsSecure());
-
- controller.setConnected(true);
-
- // set primary
- controller.setPrimary(response.isPrimary());
-
- // start the processors as indicated by the dataflow
- if (dataFlow.isAutoStartProcessors()) {
- controller.startDelayed();
- }
-
- loadTemplates(dataFlow.getTemplates());
- loadSnippets(dataFlow.getSnippets());
- controller.startHeartbeating();
- } catch (final UninheritableFlowException ufe) {
- throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX +
"local flow is different than cluster flow.", ufe);
- } catch (final FlowSerializationException fse) {
- throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX +
"local or cluster flow is malformed.", fse);
- } catch (final FlowSynchronizationException fse) {
- throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX +
"local flow controller partially updated. Administrator should disconnect node
and review flow for corruption.", fse);
- } catch (final Exception ex) {
- throw new ConnectionException("Failed to connect node to cluster
due to: " + ex, ex);
- } finally {
- writeLock.unlock();
- }
-
- }
-
- @Override
- public void copyCurrentFlow(final OutputStream os) throws IOException {
- readLock.lock();
- try {
- if (!Files.exists(flowXml) || Files.size(flowXml) == 0) {
- return;
- }
-
- try (final InputStream in = Files.newInputStream(flowXml,
StandardOpenOption.READ);
- final InputStream gzipIn = new GZIPInputStream(in)) {
- FileUtils.copy(gzipIn, os);
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public void loadTemplates(final byte[] bytes) throws IOException {
- if (bytes.length == 0) {
- return;
- }
-
- controller.clearTemplates();
-
- for (final Template template : TemplateManager.parseBytes(bytes)) {
- controller.addTemplate(template.getDetails());
- }
- }
-
- public void loadSnippets(final byte[] bytes) throws IOException {
- if (bytes.length == 0) {
- return;
- }
-
- final SnippetManager snippetManager = controller.getSnippetManager();
- snippetManager.clear();
-
- for (final StandardSnippet snippet : SnippetManager.parseBytes(bytes))
{
- snippetManager.addSnippet(snippet);
- }
- }
-
- @Override
- public FlowController getController() {
- return controller;
- }
-
- private class SaveReportingTask implements Runnable {
-
- @Override
- public void run() {
- try {
- //Hang onto the SaveHolder here rather than setting it to null
because if the save fails we will try again
- final SaveHolder holder =
StandardFlowService.this.saveHolder.get();
- if (holder == null) {
- return;
- }
-
- if (logger.isTraceEnabled()) {
- logger.trace("Save request time {} // Current time {}",
holder.saveTime.getTime(), new Date());
- }
-
- final Calendar now = Calendar.getInstance();
- if (holder.saveTime.before(now) || holder.shouldArchive) {
- if (logger.isTraceEnabled()) {
- logger.trace("Waiting for write lock and then will
save");
- }
- writeLock.lock();
- try {
- dao.save(controller, holder.shouldArchive);
- // Nulling it out if it is still set to our current
SaveHolder. Otherwise leave it alone because it means
- // another save is already pending.
- final boolean noSavePending =
StandardFlowService.this.saveHolder.compareAndSet(holder, null);
- logger.info("Saved flow controller {} // Another save
pending = {}", controller, !noSavePending);
- } finally {
- writeLock.unlock();
- }
- }
- } catch (final Throwable t) {
- logger.error("Unable to save flow controller configuration due
to: " + t, t);
- if (logger.isDebugEnabled()) {
- logger.error("", t);
- }
-
- // record the failed save as a bulletin
- final Bulletin saveFailureBulletin =
BulletinFactory.createBulletin(EVENT_CATEGORY, LogLevel.ERROR.name(), "Unable
to save flow controller configuration.");
-
controller.getBulletinRepository().addBulletin(saveFailureBulletin);
- }
- }
- }
-
- private class SaveHolder {
-
- private final Calendar saveTime;
- private final boolean shouldArchive;
-
- private SaveHolder(final Calendar moment, final boolean archive) {
- saveTime = moment;
- shouldArchive = archive;
- }
- }
-
- public boolean isPrimary() {
- readLock.lock();
- try {
- return controller.isPrimary();
- } finally {
- readLock.unlock();
- }
- }
-
- public void setPrimary(boolean primary) {
- writeLock.lock();
- try {
- controller.setPrimary(primary);
- } finally {
- writeLock.unlock();
- }
- }
-}