http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
new file mode 100644
index 0000000..1443737
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.nifi.cluster.ConnectionException;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.lifecycle.LifeCycleStartException;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.persistence.FlowConfigurationDAO;
+import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.services.FlowService;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardFlowService implements FlowService, ProtocolHandler {
+
+    private static final String EVENT_CATEGORY = "Controller";
+
+    private final FlowController controller;
+    private final Path flowXml;
+    private final Path taskConfigXml;
+    private final Path serviceConfigXml;
+    private final FlowConfigurationDAO dao;
+    private final int gracefulShutdownSeconds;
+    private final boolean autoResumeState;
+    private final int connectionRetryMillis;
+    private final StringEncryptor encryptor;
+
+    // Lock is used to protect the flow.xml file.
+    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private final AtomicReference<ScheduledExecutorService> executor = new 
AtomicReference<>(null);
+    private final AtomicReference<SaveHolder> saveHolder = new 
AtomicReference<>(null);
+
+    /**
+     * listener/sender for internal cluster communication
+     */
+    private final NodeProtocolSenderListener senderListener;
+
+    /**
+     * flag indicating whether we are operating in a clustered environment
+     */
+    private final boolean configuredForClustering;
+
+    /**
+     * the node identifier
+     */
+    private NodeIdentifier nodeId;
+
+    // guardedBy rwLock
+    private boolean firstControllerInitialization = true;
+
+    private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to 
connect node to cluster because ";
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardFlowService.class);
+
+    public static StandardFlowService createStandaloneInstance(
+            final FlowController controller,
+            final NiFiProperties properties,
+            final StringEncryptor encryptor) throws IOException {
+        return new StandardFlowService(
+                controller,
+                properties,
+                /* nodeProtocolSenderListener */ null,
+                encryptor,
+                /* configuredForClustering */ false);
+    }
+
+    public static StandardFlowService createClusteredInstance(
+            final FlowController controller,
+            final NiFiProperties properties,
+            final NodeProtocolSenderListener senderListener,
+            final StringEncryptor encryptor) throws IOException {
+        return new StandardFlowService(
+                controller,
+                properties,
+                senderListener,
+                encryptor,
+                /* configuredForClustering */ true);
+    }
+
+    private StandardFlowService(
+            final FlowController controller,
+            final NiFiProperties properties,
+            final NodeProtocolSenderListener senderListener,
+            final StringEncryptor encryptor,
+            final boolean configuredForClustering) throws IOException {
+
+        this.controller = controller;
+        this.encryptor = encryptor;
+        flowXml = 
Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE));
+        taskConfigXml = 
Paths.get(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
+        serviceConfigXml = 
Paths.get(properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE));
+
+        gracefulShutdownSeconds = (int) 
FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD),
 TimeUnit.SECONDS);
+        autoResumeState = properties.getAutoResumeState();
+        connectionRetryMillis = (int) 
FormatUtils.getTimeDuration(properties.getClusterManagerFlowRetrievalDelay(), 
TimeUnit.MILLISECONDS);
+
+        dao = new StandardXMLFlowConfigurationDAO(flowXml, taskConfigXml, 
serviceConfigXml, encryptor);
+
+        if (configuredForClustering) {
+
+            this.configuredForClustering = configuredForClustering;
+
+            this.senderListener = senderListener;
+            senderListener.addHandler(this);
+
+            final InetSocketAddress nodeApiAddress = 
properties.getNodeApiAddress();
+            final InetSocketAddress nodeSocketAddress = 
properties.getClusterNodeProtocolAddress();
+
+            // use a random UUID as the proposed node identifier
+            this.nodeId = new NodeIdentifier(UUID.randomUUID().toString(), 
nodeApiAddress.getHostName(), nodeApiAddress.getPort(), 
nodeSocketAddress.getHostName(), nodeSocketAddress.getPort());
+
+        } else {
+            this.configuredForClustering = false;
+            this.senderListener = null;
+        }
+
+    }
+
+    @Override
+    public void saveFlowChanges() throws IOException {
+        writeLock.lock();
+        try {
+            dao.save(controller);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void saveFlowChanges(final OutputStream outStream) throws 
IOException {
+        writeLock.lock();
+        try {
+            dao.save(controller, outStream);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void overwriteFlow(final InputStream is) throws IOException {
+        writeLock.lock();
+        try (final OutputStream output = Files.newOutputStream(flowXml, 
StandardOpenOption.WRITE, StandardOpenOption.CREATE);
+                final OutputStream gzipOut = new GZIPOutputStream(output);) {
+            FileUtils.copy(is, gzipOut);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void saveFlowChanges(final TimeUnit delayUnit, final long delay) {
+        saveFlowChanges(delayUnit, delay, false);
+    }
+
+    @Override
+    public void saveFlowChanges(final TimeUnit delayUnit, final long delay, 
final boolean archive) {
+        final Calendar saveTime = Calendar.getInstance();
+        final long delayInMs = TimeUnit.MILLISECONDS.convert(delay, delayUnit);
+        int finalDelayMs = 500; //default to 500 ms.
+        if (delayInMs <= Integer.MAX_VALUE) {
+            finalDelayMs = (int) delayInMs;
+        }
+        saveTime.add(Calendar.MILLISECOND, finalDelayMs);
+
+        if (logger.isTraceEnabled()) {
+            logger.trace(" A request to save the flow has been made with delay 
{} for time {}", finalDelayMs, saveTime.getTime());
+        }
+
+        saveHolder.set(new SaveHolder(saveTime, archive));
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    @Override
+    public void start() throws LifeCycleStartException {
+        writeLock.lock();
+        try {
+
+            if (isRunning()) {
+                return;
+            }
+
+            running.set(true);
+
+            final ScheduledExecutorService newExecutor = new FlowEngine(2, 
"Flow Service Tasks");
+            newExecutor.scheduleWithFixedDelay(new SaveReportingTask(), 0L, 
500L, TimeUnit.MILLISECONDS);
+            this.executor.set(newExecutor);
+
+            if (configuredForClustering) {
+                senderListener.start();
+            }
+
+        } catch (final IOException ioe) {
+            try {
+                stop(/* force */true);
+            } catch (final Exception e) {
+            }
+
+            throw new LifeCycleStartException("Failed to start Flow Service 
due to: " + ioe, ioe);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void stop(final boolean force) {
+        writeLock.lock();
+        try {
+
+            if (!isRunning()) {
+                return;
+            }
+
+            running.set(false);
+
+            if (!controller.isTerminated()) {
+                controller.shutdown(force);
+            }
+
+            if (configuredForClustering && senderListener != null) {
+                try {
+                    senderListener.stop();
+                } catch (final IOException ioe) {
+                    logger.warn("Protocol sender/listener did not stop 
gracefully due to: " + ioe);
+                }
+            }
+
+            final ScheduledExecutorService executorService = executor.get();
+            if (executorService != null) {
+                if (force) {
+                    executorService.shutdownNow();
+                } else {
+                    executorService.shutdown();
+                }
+
+                boolean graceful;
+                try {
+                    graceful = 
executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS);
+                } catch (final InterruptedException e) {
+                    graceful = false;
+                }
+
+                if (!graceful) {
+                    logger.warn("Scheduling service did not gracefully 
shutdown within configured " + gracefulShutdownSeconds + " second window");
+                }
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean canHandle(final ProtocolMessage msg) {
+        switch (msg.getType()) {
+            case RECONNECTION_REQUEST:
+            case DISCONNECTION_REQUEST:
+            case FLOW_REQUEST:
+            case PRIMARY_ROLE:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    @Override
+    public ProtocolMessage handle(final ProtocolMessage request) throws 
ProtocolException {
+        final long startNanos = System.nanoTime();
+        try {
+            switch (request.getType()) {
+                case FLOW_REQUEST:
+                    return handleFlowRequest((FlowRequestMessage) request);
+                case RECONNECTION_REQUEST:
+                    // Suspend heartbeats until we've reconnected. Otherwise,
+                    // we may send a heartbeat while we are still in the 
process of
+                    // connecting, which will cause the Cluster Manager to 
mark us 
+                    // as "Connected," which becomes problematic as the 
FlowController's lock
+                    // may still be held, causing this node to take a long 
time to respond to requests.
+                    controller.suspendHeartbeats();
+
+                    new Thread(new Runnable() {
+                        @Override
+                        public void run() {
+                            
handleReconnectionRequest((ReconnectionRequestMessage) request);
+                        }
+                    }, "Reconnect to Cluster").start();
+
+                    return new ReconnectionResponseMessage();
+                case DISCONNECTION_REQUEST:
+                    new Thread(new Runnable() {
+                        @Override
+                        public void run() {
+                            handleDisconnectionRequest((DisconnectMessage) 
request);
+                        }
+                    }, "Disconnect from Cluster").start();
+
+                    return null;
+                case PRIMARY_ROLE:
+                    new Thread(new Runnable() {
+                        @Override
+                        public void run() {
+                            
handlePrimaryRoleAssignment((PrimaryRoleAssignmentMessage) request);
+                        }
+                    }, "Set Primary Role Status").start();
+                    return null;
+                default:
+                    throw new ProtocolException("Handler cannot handle message 
type: " + request.getType());
+            }
+        } finally {
+            if (logger.isDebugEnabled()) {
+                final long procNanos = System.nanoTime() - startNanos;
+                final long procMillis = 
TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
+                logger.debug("Finished Processing Protocol Message of type {} 
in {} millis", request.getType(), procMillis);
+            }
+        }
+    }
+
+    @Override
+    public void load(final DataFlow proposedFlow) throws IOException, 
FlowSerializationException,
+            FlowSynchronizationException, UninheritableFlowException {
+        writeLock.lock();
+        try {
+            if (configuredForClustering) {
+                /*
+                 * Attempt to connect to the cluster.  If the manager is able 
to
+                 * provide a data flow, then the manager will send a connection
+                 * response.  If the manager was unable to be located, then 
+                 * the response will be null and we should load the local 
dataflow
+                 * and heartbeat until a manager is located.
+                 */
+                final boolean localFlowEmpty = 
StandardFlowSynchronizer.isEmpty(proposedFlow, encryptor);
+                final ConnectionResponse response = connect(localFlowEmpty, 
localFlowEmpty);
+
+                if (response == null) {
+                    logger.info("Flow controller will load local dataflow and 
suspend connection handshake until a cluster connection response is received.");
+
+                    // load local proposed flow
+                    loadFromBytes(proposedFlow, false);
+
+                    // set node ID on controller before we start heartbeating 
because heartbeat needs node ID
+                    controller.setNodeId(nodeId);
+
+                    // set node as clustered, since it is trying to connect to 
a cluster
+                    controller.setClustered(true, null);
+                    controller.setClusterManagerRemoteSiteInfo(null, null);
+                    controller.setConnected(false);
+
+                    /*
+                     * Start heartbeating.  Heartbeats will fail because we 
can't reach 
+                     * the manager, but when we locate the manager, the node 
will 
+                     * reconnect and establish a connection to the cluster.  
The 
+                     * heartbeat is the trigger that will cause the manager to 
+                     * issue a reconnect request.
+                     */
+                    controller.startHeartbeating();
+
+                    // if configured, start all components
+                    if (autoResumeState) {
+                        try {
+                            controller.startDelayed();
+                        } catch (final Exception ex) {
+                            logger.warn("Unable to start all processors due to 
invalid flow configuration.");
+                            if (logger.isDebugEnabled()) {
+                                logger.warn(StringUtils.EMPTY, ex);
+                            }
+                        }
+                    }
+
+                } else {
+                    try {
+                        loadFromConnectionResponse(response);
+                    } catch (final ConnectionException ce) {
+                        logger.error("Failed to load flow from cluster due to: 
" + ce, ce);
+
+                        /*
+                         * If we failed processing the response, then we want 
to notify
+                         * the manager so that it can mark the node as 
disconnected.
+                         */
+                        // create error message
+                        final ControllerStartupFailureMessage msg = new 
ControllerStartupFailureMessage();
+                        msg.setExceptionMessage(ce.getMessage());
+                        msg.setNodeId(response.getNodeIdentifier());
+
+                        // send error message to manager
+                        try {
+                            senderListener.notifyControllerStartupFailure(msg);
+                        } catch (final ProtocolException | 
UnknownServiceAddressException e) {
+                            logger.warn("Failed to notify cluster manager of 
controller startup failure due to: " + e, e);
+                        }
+
+                        throw new IOException(ce);
+                    }
+                }
+            } else {
+                // operating in standalone mode, so load proposed flow
+                loadFromBytes(proposedFlow, true);
+            }
+
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private FlowResponseMessage handleFlowRequest(final FlowRequestMessage 
request) throws ProtocolException {
+        readLock.lock();
+        try {
+            logger.info("Received flow request message from manager.");
+
+            // serialize the flow to the output stream
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            copyCurrentFlow(baos);
+            final byte[] flowBytes = baos.toByteArray();
+            baos.reset();
+
+            final byte[] templateBytes = 
controller.getTemplateManager().export();
+            final byte[] snippetBytes = 
controller.getSnippetManager().export();
+
+            // create the response
+            final FlowResponseMessage response = new FlowResponseMessage();
+
+            response.setDataFlow(new StandardDataFlow(flowBytes, 
templateBytes, snippetBytes));
+
+            return response;
+
+        } catch (final Exception ex) {
+            throw new ProtocolException("Failed serializing flow controller 
state for flow request due to: " + ex, ex);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    private void handlePrimaryRoleAssignment(final 
PrimaryRoleAssignmentMessage msg) {
+        writeLock.lock();
+        try {
+            controller.setPrimary(msg.isPrimary());
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private void handleReconnectionRequest(final ReconnectionRequestMessage 
request) {
+        writeLock.lock();
+        try {
+            logger.info("Processing reconnection request from manager.");
+
+            // reconnect
+            final ConnectionResponse connectionResponse = new 
ConnectionResponse(nodeId, request.getDataFlow(), request.isPrimary(),
+                    request.getManagerRemoteSiteListeningPort(), 
request.isManagerRemoteSiteCommsSecure(), request.getInstanceId());
+            connectionResponse.setClusterManagerDN(request.getRequestorDN());
+            loadFromConnectionResponse(connectionResponse);
+
+            controller.resumeHeartbeats();  // we are now connected, so resume 
sending heartbeats.
+
+            logger.info("Node reconnected.");
+        } catch (final Exception ex) {
+            // disconnect controller 
+            if (controller.isClustered()) {
+                disconnect();
+            }
+
+            logger.error("Handling reconnection request failed due to: " + ex, 
ex);
+
+            final ReconnectionFailureMessage failureMessage = new 
ReconnectionFailureMessage();
+            failureMessage.setNodeId(request.getNodeId());
+            failureMessage.setExceptionMessage(ex.toString());
+
+            // send error message to manager
+            try {
+                senderListener.notifyReconnectionFailure(failureMessage);
+            } catch (final ProtocolException | UnknownServiceAddressException 
e) {
+                logger.warn("Failed to notify cluster manager of controller 
reconnection failure due to: " + e, e);
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private void handleDisconnectionRequest(final DisconnectMessage request) {
+        writeLock.lock();
+        try {
+            logger.info("Received disconnection request message from manager 
with explanation: " + request.getExplanation());
+            disconnect();
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private void disconnect() {
+        writeLock.lock();
+        try {
+
+            logger.info("Disconnecting node.");
+
+            // mark node as not connected
+            controller.setConnected(false);
+
+            // turn off primary flag
+            controller.setPrimary(false);
+
+            // stop heartbeating
+            controller.stopHeartbeating();
+
+            // set node to not clustered
+            controller.setClustered(false, null);
+
+            logger.info("Node disconnected.");
+
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    // write lock must already be acquired
+    private void loadFromBytes(final DataFlow proposedFlow, final boolean 
allowEmptyFlow)
+            throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException {
+        logger.trace("Loading flow from bytes");
+        final TemplateManager templateManager = 
controller.getTemplateManager();
+        templateManager.loadTemplates();
+        logger.trace("Finished loading templates");
+
+        // resolve the given flow (null means load flow from disk)
+        final DataFlow actualProposedFlow;
+        final byte[] flowBytes;
+        final byte[] templateBytes;
+        if (proposedFlow == null) {
+            final ByteArrayOutputStream flowOnDisk = new 
ByteArrayOutputStream();
+            copyCurrentFlow(flowOnDisk);
+            flowBytes = flowOnDisk.toByteArray();
+            templateBytes = templateManager.export();
+            logger.debug("Loaded Flow from bytes");
+        } else {
+            flowBytes = proposedFlow.getFlow();
+            templateBytes = proposedFlow.getTemplates();
+            logger.debug("Loaded flow from proposed flow");
+        }
+
+        actualProposedFlow = new StandardDataFlow(flowBytes, templateBytes, 
null);
+
+        if (firstControllerInitialization) {
+            // load the controller services
+            logger.debug("Loading controller services");
+            dao.loadControllerServices(controller);
+        }
+
+        // load the flow
+        logger.debug("Loading proposed flow into FlowController");
+        dao.load(controller, actualProposedFlow);
+
+        final ProcessGroup rootGroup = 
controller.getGroup(controller.getRootGroupId());
+        if (rootGroup.isEmpty() && !allowEmptyFlow) {
+            throw new FlowSynchronizationException("Failed to load flow 
because unable to connect to cluster and local flow is empty");
+        }
+
+        // lazy initialization of controller tasks and flow
+        if (firstControllerInitialization) {
+            logger.debug("First controller initialization. Loading reporting 
tasks and initializing controller.");
+
+            // load the controller tasks
+            dao.loadReportingTasks(controller);
+
+            // initialize the flow
+            controller.initializeFlow();
+
+            firstControllerInitialization = false;
+        }
+    }
+
+    private ConnectionResponse connect(final boolean retryOnCommsFailure, 
final boolean retryIndefinitely) throws ConnectionException {
+        writeLock.lock();
+        try {
+            logger.info("Connecting Node: " + nodeId);
+
+            // create connection request message
+            final ConnectionRequest request = new ConnectionRequest(nodeId);
+            final ConnectionRequestMessage requestMsg = new 
ConnectionRequestMessage();
+            requestMsg.setConnectionRequest(request);
+
+            // send connection request to cluster manager
+            /*
+             * Try to get a current copy of the cluster's dataflow from the 
manager 
+             * for ten times, sleeping between attempts.  Ten times should be 
+             * enough because the manager will register the node as connecting
+             * and therefore, no other changes to the cluster flow can occur.
+             * 
+             * However, the manager needs to obtain a current data flow within
+             * maxAttempts * tryLaterSeconds or else the node will fail to 
startup.
+             */
+            final int maxAttempts = 10;
+            ConnectionResponse response = null;
+            for (int i = 0; i < maxAttempts || retryIndefinitely; i++) {
+                try {
+                    response = 
senderListener.requestConnection(requestMsg).getConnectionResponse();
+                    if (response.isBlockedByFirewall()) {
+                        logger.warn("Connection request was blocked by cluster 
manager's firewall.");
+                        // set response to null and treat a firewall blockage 
the same as getting no response from manager
+                        response = null;
+                        break;
+                    } else if (response.shouldTryLater()) {
+                        logger.info("Flow controller requested by cluster 
manager to retry connection in " + response.getTryLaterSeconds() + " seconds.");
+                        try {
+                            Thread.sleep(response.getTryLaterSeconds() * 1000);
+                        } catch (final InterruptedException ie) {
+                            // we were interrupted, so finish quickly
+                            break;
+                        }
+                    } else {
+                        // we received a successful connection response from 
manager
+                        break;
+                    }
+
+                } catch (final Exception pe) {
+                    // could not create a socket and communicate with manager
+                    logger.warn("Failed to connect to cluster due to: " + pe, 
pe);
+                    if (retryOnCommsFailure) {
+                        try {
+                            Thread.sleep(connectionRetryMillis);
+                        } catch (final InterruptedException ie) {
+                            break;
+                        }
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            if (response == null) {
+                // if response is null, then either we had IO problems or we 
were blocked by firewall or we couldn't determine manager's address
+                return response;
+            } else if (response.shouldTryLater()) {
+                // if response indicates we should try later, then manager was 
unable to service our request. Just load local flow and move on.
+                return null;
+            } else {
+                // cluster manager provided a successful response with a 
current dataflow
+                return response;
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private void loadFromConnectionResponse(final ConnectionResponse response) 
throws ConnectionException {
+        writeLock.lock();
+        try {
+
+            // get the dataflow from the response
+            final DataFlow dataFlow = response.getDataFlow();
+
+            // load new controller state
+            loadFromBytes(dataFlow, true);
+
+            // set node ID on controller before we start heartbeating because 
heartbeat needs node ID
+            nodeId = response.getNodeIdentifier();
+            logger.info("Setting Flow Controller's Node ID: " + nodeId);
+            controller.setNodeId(nodeId);
+
+            // mark the node as clustered
+            controller.setClustered(true, response.getInstanceId(), 
response.getClusterManagerDN());
+            
controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(),
 response.isManagerRemoteCommsSecure());
+
+            controller.setConnected(true);
+
+            // set primary
+            controller.setPrimary(response.isPrimary());
+
+            // start the processors as indicated by the dataflow
+            if (dataFlow.isAutoStartProcessors()) {
+                controller.startDelayed();
+            }
+
+            loadTemplates(dataFlow.getTemplates());
+            loadSnippets(dataFlow.getSnippets());
+            controller.startHeartbeating();
+        } catch (final UninheritableFlowException ufe) {
+            throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + 
"local flow is different than cluster flow.", ufe);
+        } catch (final FlowSerializationException fse) {
+            throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + 
"local or cluster flow is malformed.", fse);
+        } catch (final FlowSynchronizationException fse) {
+            throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + 
"local flow controller partially updated.  Administrator should disconnect node 
and review flow for corruption.", fse);
+        } catch (final Exception ex) {
+            throw new ConnectionException("Failed to connect node to cluster 
due to: " + ex, ex);
+        } finally {
+            writeLock.unlock();
+        }
+
+    }
+
+    @Override
+    public void copyCurrentFlow(final OutputStream os) throws IOException {
+        readLock.lock();
+        try {
+            if (!Files.exists(flowXml) || Files.size(flowXml) == 0) {
+                return;
+            }
+
+            try (final InputStream in = Files.newInputStream(flowXml, 
StandardOpenOption.READ);
+                    final InputStream gzipIn = new GZIPInputStream(in)) {
+                FileUtils.copy(gzipIn, os);
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public void loadTemplates(final byte[] bytes) throws IOException {
+        if (bytes.length == 0) {
+            return;
+        }
+
+        controller.clearTemplates();
+
+        for (final Template template : TemplateManager.parseBytes(bytes)) {
+            controller.addTemplate(template.getDetails());
+        }
+    }
+
+    public void loadSnippets(final byte[] bytes) throws IOException {
+        if (bytes.length == 0) {
+            return;
+        }
+
+        final SnippetManager snippetManager = controller.getSnippetManager();
+        snippetManager.clear();
+
+        for (final StandardSnippet snippet : SnippetManager.parseBytes(bytes)) 
{
+            snippetManager.addSnippet(snippet);
+        }
+    }
+
+    @Override
+    public FlowController getController() {
+        return controller;
+    }
+
+    private class SaveReportingTask implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                //Hang onto the SaveHolder here rather than setting it to null 
because if the save fails we will try again
+                final SaveHolder holder = 
StandardFlowService.this.saveHolder.get();
+                if (holder == null) {
+                    return;
+                }
+
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Save request time {} // Current time {}", 
holder.saveTime.getTime(), new Date());
+                }
+
+                final Calendar now = Calendar.getInstance();
+                if (holder.saveTime.before(now) || holder.shouldArchive) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("Waiting for write lock and then will 
save");
+                    }
+                    writeLock.lock();
+                    try {
+                        dao.save(controller, holder.shouldArchive);
+                        // Nulling it out if it is still set to our current 
SaveHolder.  Otherwise leave it alone because it means 
+                        // another save is already pending.
+                        final boolean noSavePending = 
StandardFlowService.this.saveHolder.compareAndSet(holder, null);
+                        logger.info("Saved flow controller {} // Another save 
pending = {}", controller, !noSavePending);
+                    } finally {
+                        writeLock.unlock();
+                    }
+                }
+            } catch (final Throwable t) {
+                logger.error("Unable to save flow controller configuration due 
to: " + t, t);
+                if (logger.isDebugEnabled()) {
+                    logger.error("", t);
+                }
+
+                // record the failed save as a bulletin
+                final Bulletin saveFailureBulletin = 
BulletinFactory.createBulletin(EVENT_CATEGORY, LogLevel.ERROR.name(), "Unable 
to save flow controller configuration.");
+                
controller.getBulletinRepository().addBulletin(saveFailureBulletin);
+            }
+        }
+    }
+
+    private class SaveHolder {
+
+        private final Calendar saveTime;
+        private final boolean shouldArchive;
+
+        private SaveHolder(final Calendar moment, final boolean archive) {
+            saveTime = moment;
+            shouldArchive = archive;
+        }
+    }
+
+    public boolean isPrimary() {
+        readLock.lock();
+        try {
+            return controller.isPrimary();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public void setPrimary(boolean primary) {
+        writeLock.lock();
+        try {
+            controller.setPrimary(primary);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+}

Reply via email to