http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
new file mode 100644
index 0000000..4e3b932
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl.testutils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+
+/**
+ * @author unattributed
+ */
+public class ReflexiveProtocolHandler implements ProtocolHandler {
+    
+    private List<ProtocolMessage> messages = new ArrayList<>();
+        
+    @Override
+    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+        messages.add(msg);
+        return msg;
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return true;
+    }
+
+    public List<ProtocolMessage> getMessages() {
+        return messages;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/.gitignore
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-web/.gitignore 
b/nar-bundles/framework-bundle/framework/cluster-web/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-web/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-web/pom.xml 
b/nar-bundles/framework-bundle/framework/cluster-web/pom.xml
new file mode 100644
index 0000000..a7c39c6
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-web/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-framework-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>framework-cluster-web</artifactId>
+    <packaging>jar</packaging>
+    <name>NiFi Framework Cluster Web</name>
+    <description>The clustering software for communicating with the NiFi web 
api.</description>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>web-optimistic-locking</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-administration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-user-actions</artifactId>
+        </dependency>
+        
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
new file mode 100644
index 0000000..44fb25a
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.context;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.nifi.action.Action;
+import org.apache.nifi.web.Revision;
+
+/**
+ * Contains contextual information about clustering that may be serialized 
+ * between manager and node when communicating over HTTP.
+ */
+public interface ClusterContext extends Serializable {
+    
+    /**
+     * Returns a list of auditable actions.  The list is modifiable
+     * and will never be null.
+     * @return a collection of actions
+     */
+    List<Action> getActions();
+    
+    Revision getRevision();
+    
+    void setRevision(Revision revision);
+    
+    /**
+     * @return true if the request was sent by the cluster manager; false 
otherwise
+     */
+    boolean isRequestSentByClusterManager();
+    
+    /**
+     * Sets the flag to indicate if a request was sent by the cluster manager.
+     * @param flag true if the request was sent by the cluster manager; false 
otherwise
+     */
+    void setRequestSentByClusterManager(boolean flag);
+    
+    /**
+     * Gets an id generation seed. This is used to ensure that nodes are able 
to generate the
+     * same id across the cluster. This is usually handled by the cluster 
manager creating the
+     * id, however for some actions (snippets, templates, etc) this is not 
possible.
+     * @return 
+     */
+    String getIdGenerationSeed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
new file mode 100644
index 0000000..06907d2
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.context;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.nifi.action.Action;
+import org.apache.nifi.web.Revision;
+
+/**
+ * A basic implementation of the context.
+ */
+public class ClusterContextImpl implements ClusterContext, Serializable {
+
+    private final List<Action> actions = new ArrayList<>();
+    
+    private Revision revision;
+    
+    private boolean requestSentByClusterManager;
+    
+    private final String idGenerationSeed = UUID.randomUUID().toString();
+    
+    @Override
+    public List<Action> getActions() {
+        return actions;
+    }
+
+    @Override
+    public Revision getRevision() {
+        return revision;
+    }
+
+    @Override
+    public void setRevision(Revision revision) {
+        this.revision = revision;
+    }
+
+    @Override
+    public boolean isRequestSentByClusterManager() {
+        return requestSentByClusterManager;
+    }
+    
+    @Override
+    public void setRequestSentByClusterManager(boolean 
requestSentByClusterManager) {
+        this.requestSentByClusterManager = requestSentByClusterManager;
+    }
+
+    @Override
+    public String getIdGenerationSeed() {
+        return this.idGenerationSeed;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
new file mode 100644
index 0000000..012e7c7
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.context;
+
+/**
+ * Manages a cluster context on a threadlocal.
+ */
+public class ClusterContextThreadLocal {
+    
+    private static final ThreadLocal<ClusterContext> contextHolder = new 
ThreadLocal<>();
+    
+    public static void removeContext() {
+        contextHolder.remove();
+    }
+    
+    public static ClusterContext createEmptyContext() {
+        return new ClusterContextImpl();
+    }
+    
+    public static ClusterContext getContext() {
+        ClusterContext ctx = contextHolder.get();
+        if(ctx == null) {
+            ctx = createEmptyContext();
+            contextHolder.set(ctx);
+        }
+        return ctx;
+    }
+    
+    public static void setContext(final ClusterContext context) {
+        contextHolder.set(context);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
new file mode 100644
index 0000000..90b8a37
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web;
+
+import org.apache.nifi.cluster.context.ClusterContext;
+import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+
+/**
+ * An optimistic locking manager that provides for optimistic locking in a 
clustered
+ * environment.
+ * 
+ * @author unattributed
+ */
+public class ClusterAwareOptimisticLockingManager implements 
OptimisticLockingManager {
+
+    private final OptimisticLockingManager optimisticLockingManager;
+    
+    public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager 
optimisticLockingManager) {
+        this.optimisticLockingManager = optimisticLockingManager;
+    }
+    
+    @Override
+    public Revision checkRevision(Revision revision) throws 
InvalidRevisionException {
+        final Revision currentRevision = getRevision();
+        if(currentRevision.equals(revision) == false) {
+            throw new InvalidRevisionException(String.format("Given revision 
%s does not match current revision %s.", revision, currentRevision));
+        } else {
+            return revision.increment(revision.getClientId());
+        }
+    }
+
+    @Override
+    public boolean isCurrent(Revision revision) {
+        return getRevision().equals(revision);
+    }
+
+    @Override
+    public Revision getRevision() {
+        final ClusterContext ctx = ClusterContextThreadLocal.getContext();
+        if(ctx == null || ctx.getRevision() == null) {
+            return optimisticLockingManager.getRevision();
+        } else {
+            return ctx.getRevision();
+        }
+    }
+
+    @Override
+    public void setRevision(final Revision revision) {
+        final ClusterContext ctx = ClusterContextThreadLocal.getContext();
+        if(ctx != null) {
+            ctx.setRevision(revision);
+        }
+        optimisticLockingManager.setRevision(revision);
+    }
+
+    @Override
+    public Revision incrementRevision() {
+        final Revision currentRevision = getRevision();
+        final Revision incRevision = currentRevision.increment();
+        setRevision(incRevision);
+        return incRevision;
+    }
+
+    @Override
+    public Revision incrementRevision(final String clientId) {
+        final Revision currentRevision = getRevision();
+        final Revision incRevision = currentRevision.increment(clientId);
+        setRevision(incRevision);
+        return incRevision;
+    }
+
+    @Override
+    public String getLastModifier() {
+        return optimisticLockingManager.getLastModifier();
+    }
+
+    @Override
+    public void setLastModifier(final String lastModifier) {
+        optimisticLockingManager.setLastModifier(lastModifier);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/.gitignore
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/.gitignore 
b/nar-bundles/framework-bundle/framework/cluster/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/pom.xml 
b/nar-bundles/framework-bundle/framework/cluster/pom.xml
new file mode 100644
index 0000000..ad5dda7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-framework-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>framework-cluster</artifactId>
+    <packaging>jar</packaging>
+    <name>NiFi Framework Cluster</name>
+    <description>The clustering software for NiFi.</description>
+    <dependencies>
+        
+        <!-- application core dependencies -->
+        
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-logging-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-file-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>client-dto</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>framework-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>core-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>framework-cluster-protocol</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>framework-cluster-web</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-administration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>site-to-site</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <version>1.9</version>
+        </dependency>
+        
+        <!-- third party dependencies -->
+        
+        <!-- sun dependencies -->
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>javax.servlet-api</artifactId>
+        </dependency>
+        
+        <!-- commons dependencies -->
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+        </dependency>
+        
+        <!-- jersey dependencies -->
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+        </dependency>
+        
+        <!-- spring dependencies -->
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-beans</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+        </dependency>
+        
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java
new file mode 100644
index 0000000..183c7ca
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple multicast test client that sends ping messages to a group address.
+ */
+public class MulticastTestClient {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MulticastTestClient.class);
+
+    private static final int PING_DELAY_SECONDS = 3;
+
+    public static void main(final String... args) throws IOException {
+
+        String group = System.getProperty("group", "225.0.0.0");
+        if (group == null) {
+            System.out.println("Host system property 'group' was not given.");
+            return;
+        }
+        group = group.trim();
+        if (group.length() == 0) {
+            System.out.println("Host system property 'group' must be 
non-empty.");
+            return;
+        }
+
+        final String portStr = System.getProperty("port", "2222");
+        final int port;
+        try {
+            port = Integer.parseInt(portStr);
+        } catch (final NumberFormatException nfe) {
+            System.out.println("Port system property 'port' was not a valid 
port.");
+            return;
+        }
+
+        logger.info(String.format("Pinging every %s seconds using multicast 
address: %s:%s.", PING_DELAY_SECONDS, group, port));
+        logger.info("Override defaults by using system properties 
'-Dgroup=<Class D IP>' and '-Dport=<unused port>'.");
+        logger.info("The test client may be stopped by entering a newline at 
the command line.");
+
+        final InetSocketAddress addr = new InetSocketAddress(group, port);
+        final ProtocolContext<ProtocolMessage> protocolContext = new 
JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT);
+        final MulticastConfiguration multicastConfig = new 
MulticastConfiguration();
+        multicastConfig.setReuseAddress(true);
+
+        // setup listener
+        final MulticastProtocolListener listener = new 
MulticastProtocolListener(1, addr, multicastConfig, protocolContext);
+        listener.addHandler(new ProtocolHandler() {
+            @Override
+            public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+                final PingMessage pingMsg = (PingMessage) msg;
+                final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+                logger.info("Pinged at: " + sdf.format(pingMsg.getDate()));
+                return null;
+            }
+
+            @Override
+            public boolean canHandle(ProtocolMessage msg) {
+                return true;
+            }
+        });
+
+        // setup socket
+        final MulticastSocket multicastSocket = 
MulticastUtils.createMulticastSocket(multicastConfig);
+
+        // setup broadcaster
+        final Timer broadcaster = new Timer("Multicast Test Client", /**
+                 * is daemon *
+                 */
+                true);
+
+        try {
+
+            // start listening
+            listener.start();
+
+            // start broadcasting
+            broadcaster.schedule(new TimerTask() {
+
+                @Override
+                public void run() {
+                    try {
+
+                        final PingMessage msg = new PingMessage();
+                        msg.setDate(new Date());
+
+                        // marshal message to output stream
+                        final ProtocolMessageMarshaller<ProtocolMessage> 
marshaller = protocolContext.createMarshaller();
+                        final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                        marshaller.marshal(msg, baos);
+                        final byte[] packetBytes = baos.toByteArray();
+
+                        // send message
+                        final DatagramPacket packet = new 
DatagramPacket(packetBytes, packetBytes.length, addr);
+                        multicastSocket.send(packet);
+
+                    } catch (final Exception ex) {
+                        logger.warn("Failed to send message due to: " + ex, 
ex);
+                    }
+                }
+            }, 0, PING_DELAY_SECONDS * 1000);
+
+            // block until any input is received
+            System.in.read();
+
+        } finally {
+            broadcaster.cancel();
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java
new file mode 100644
index 0000000..6bc5d6c
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.event;
+
+import java.util.Date;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Events describe the occurrence of something noteworthy. They record the
+ * event's source, a timestamp, a description, and a category.
+ *
+ * @author unattributed
+ *
+ * @Immutable
+ */
+public class Event {
+
+    public static enum Category {
+
+        DEBUG,
+        INFO,
+        WARN
+    }
+
+    private final String source;
+
+    private final long timestamp;
+
+    private final Category category;
+
+    private final String message;
+
+    /**
+     * Creates an event with the current time as the timestamp and a category 
of
+     * "INFO".
+     *
+     * @param source the source
+     * @param message the description
+     */
+    public Event(final String source, final String message) {
+        this(source, message, Category.INFO);
+    }
+
+    /**
+     * Creates an event with the current time as the timestamp.
+     *
+     * @param source the source
+     * @param message the description
+     * @param category the event category
+     */
+    public Event(final String source, final String message, final Category 
category) {
+        this(source, message, category, new Date().getTime());
+    }
+
+    /**
+     * Creates an event with the a category of "INFO".
+     *
+     * @param source the source
+     * @param message the description
+     * @param timestamp the time of occurrence
+     */
+    public Event(final String source, final String message, final long 
timestamp) {
+        this(source, message, Category.INFO, timestamp);
+    }
+
+    /**
+     * Creates an event.
+     *
+     * @param source the source
+     * @param message the description
+     * @param category the event category
+     * @param timestamp the time of occurrence
+     */
+    public Event(final String source, final String message, final Category 
category, final long timestamp) {
+
+        if (StringUtils.isBlank(source)) {
+            throw new IllegalArgumentException("Source may not be empty or 
null.");
+        } else if (StringUtils.isBlank(message)) {
+            throw new IllegalArgumentException("Event message may not be empty 
or null.");
+        } else if (category == null) {
+            throw new IllegalArgumentException("Event category may not be 
null.");
+        } else if (timestamp < 0) {
+            throw new IllegalArgumentException("Timestamp may not be negative: 
" + timestamp);
+        }
+
+        this.source = source;
+        this.message = message;
+        this.category = category;
+        this.timestamp = timestamp;
+    }
+
+    public Category getCategory() {
+        return category;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public String getSource() {
+        return source;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
new file mode 100644
index 0000000..f9dfb00
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.event;
+
+import java.util.List;
+
+/**
+ * Manages an ordered list of events. The event history size dictates the total
+ * number of events to manage for a given source at a given time. When the size
+ * is exceeded, the oldest event for that source is evicted.
+ *
+ * @author unattributed
+ */
+public interface EventManager {
+
+    /**
+     * Adds an event to the manager.
+     *
+     * @param event an Event
+     */
+    void addEvent(Event event);
+
+    /**
+     * Returns a list of events for a given source sorted by the event's
+     * timestamp where the most recent event is first in the list.
+     *
+     * @param eventSource the source
+     *
+     * @return the list of events
+     */
+    List<Event> getEvents(String eventSource);
+
+    /*
+     * Returns the most recent event for the source.  If no events exist, then
+     * null is returned.
+     */
+    Event getMostRecentEvent(String eventSource);
+
+    /*
+     * Clears all events for the given source.
+     */
+    void clearEventHistory(String eventSource);
+
+    /**
+     * Returns the history size.
+     *
+     * @return the history size
+     */
+    int getEventHistorySize();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
new file mode 100644
index 0000000..7fadc78
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.event.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.event.EventManager;
+
+/**
+ * Implements the EventManager.
+ *
+ * @author unattributed
+ */
+public class EventManagerImpl implements EventManager {
+
+    /**
+     * associates the source ID with an ordered queue of events, ordered by 
most
+     * recent event
+     */
+    private final Map<String, Queue<Event>> eventsMap = new HashMap<>();
+
+    /**
+     * the number of events to maintain for a given source
+     */
+    private final int eventHistorySize;
+
+    /**
+     * Creates an instance.
+     *
+     * @param eventHistorySize the number of events to manage for a given
+     * source. Value must be positive.
+     */
+    public EventManagerImpl(final int eventHistorySize) {
+        if (eventHistorySize <= 0) {
+            throw new IllegalArgumentException("Event history size must be 
positive: " + eventHistorySize);
+        }
+        this.eventHistorySize = eventHistorySize;
+    }
+
+    @Override
+    public void addEvent(final Event event) {
+
+        if (event == null) {
+            throw new IllegalArgumentException("Event may not be null.");
+        }
+
+        Queue<Event> events = eventsMap.get(event.getSource());
+        if (events == null) {
+            // no events from this source, so add a new queue to the map
+            events = new PriorityQueue<>(eventHistorySize, 
createEventComparator());
+            eventsMap.put(event.getSource(), events);
+        }
+
+        // add event
+        events.add(event);
+
+        // if we exceeded the history size, then evict the oldest event
+        if (events.size() > eventHistorySize) {
+            removeOldestEvent(events);
+        }
+
+    }
+
+    @Override
+    public List<Event> getEvents(final String eventSource) {
+        final Queue<Event> events = eventsMap.get(eventSource);
+        if (events == null) {
+            return Collections.EMPTY_LIST;
+        } else {
+            return Collections.unmodifiableList(new ArrayList<>(events));
+        }
+    }
+
+    @Override
+    public int getEventHistorySize() {
+        return eventHistorySize;
+    }
+
+    @Override
+    public Event getMostRecentEvent(final String eventSource) {
+        final Queue<Event> events = eventsMap.get(eventSource);
+        if (events == null) {
+            return null;
+        } else {
+            return events.peek();
+        }
+    }
+
+    @Override
+    public void clearEventHistory(final String eventSource) {
+        eventsMap.remove(eventSource);
+    }
+
+    private Comparator createEventComparator() {
+        return new Comparator<Event>() {
+            @Override
+            public int compare(final Event o1, final Event o2) {
+                // orders events by most recent first
+                return (int) (o2.getTimestamp() - o1.getTimestamp());
+            }
+        };
+    }
+
+    private void removeOldestEvent(final Collection<Event> events) {
+
+        if (events.isEmpty()) {
+            return;
+        }
+
+        Event oldestEvent = null;
+        for (final Event event : events) {
+            if (oldestEvent == null || oldestEvent.getTimestamp() > 
event.getTimestamp()) {
+                oldestEvent = event;
+            }
+        }
+
+        events.remove(oldestEvent);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
new file mode 100644
index 0000000..2e3d278
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.firewall;
+
+/**
+ * Defines the interface for restricting external client connections to a set 
of
+ * hosts or IPs.
+ */
+public interface ClusterNodeFirewall {
+
+    /**
+     * Returns true if the given host or IP is permissible through the 
firewall;
+     * false otherwise.
+     *
+     * If an IP is given, then it must be formatted in dotted decimal notation.
+     * @param hostOrIp
+     * @return 
+     */
+    boolean isPermissible(String hostOrIp);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
new file mode 100644
index 0000000..bcee661
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.firewall.impl;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import org.apache.commons.net.util.SubnetUtils;
+import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.logging.NiFiLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file-based implementation of the ClusterFirewall interface. The class is
+ * configured with a file. If the file is empty, then everything is 
permissible.
+ * Otherwise, the file should contain hostnames or IPs formatted as dotted
+ * decimals with an optional CIDR suffix. Each entry must be separated by a
+ * newline. An example configuration is given below:
+ *
+ * <code>
+ * # hash character is a comment delimiter
+ * 1.2.3.4         # exact IP
+ * some.host.name  # a host name
+ * 4.5.6.7/8       # range of CIDR IPs
+ * 9.10.11.12/13   # a smaller range of CIDR IPs
+ * </code>
+ *
+ * This class allows for synchronization with an optionally configured restore
+ * directory. If configured, then at startup, if the either the config file or
+ * the restore directory's copy is missing, then the configuration file will be
+ * copied to the appropriate location. If both restore directory contains a 
copy
+ * that is different in content to configuration file, then an exception is
+ * thrown at construction time.
+ */
+public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall {
+
+    private final File config;
+
+    private final File restoreDirectory;
+
+    private final Collection<SubnetUtils.SubnetInfo> subnetInfos = new 
ArrayList<>();
+
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(FileBasedClusterNodeFirewall.class));
+
+    public FileBasedClusterNodeFirewall(final File config) throws IOException {
+        this(config, null);
+    }
+
+    public FileBasedClusterNodeFirewall(final File config, final File 
restoreDirectory) throws IOException {
+
+        if (config == null) {
+            throw new IllegalArgumentException("Firewall configuration file 
may not be null.");
+        }
+
+        this.config = config;
+        this.restoreDirectory = restoreDirectory;
+
+        if (restoreDirectory != null) {
+            // synchronize with restore directory
+            try {
+                syncWithRestoreDirectory();
+            } catch (final IOException ioe) {
+                throw new RuntimeException(ioe);
+            }
+        }
+
+        if (!config.exists() && !config.createNewFile()) {
+            throw new IOException("Firewall configuration file did not exist 
and could not be created: " + config.getAbsolutePath());
+        }
+
+        logger.info("Loading cluster firewall configuration.");
+        parseConfig(config);
+        logger.info("Cluster firewall configuration loaded.");
+    }
+
+    @Override
+    public boolean isPermissible(final String hostOrIp) {
+        try {
+
+            // if no rules, then permit everything
+            if (subnetInfos.isEmpty()) {
+                return true;
+            }
+
+            final String ip;
+            try {
+                ip = InetAddress.getByName(hostOrIp).getHostAddress();
+            } catch (final UnknownHostException uhe) {
+                logger.warn("Blocking unknown host: " + hostOrIp, uhe);
+                return false;
+            }
+
+            // check each subnet to see if IP is in range
+            for (final SubnetUtils.SubnetInfo subnetInfo : subnetInfos) {
+                if (subnetInfo.isInRange(ip)) {
+                    return true;
+                }
+            }
+
+            // no match
+            return false;
+
+        } catch (final IllegalArgumentException iae) {
+            return false;
+        }
+    }
+
+    private void syncWithRestoreDirectory() throws IOException {
+
+        // sanity check that restore directory is a directory, creating it if 
necessary
+        FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
+
+        // check that restore directory is not the same as the primary 
directory
+        if 
(config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath()))
 {
+            throw new IllegalStateException(
+                    String.format("Cluster firewall configuration file '%s' 
cannot be in the restore directory '%s' ",
+                            config.getAbsolutePath(), 
restoreDirectory.getAbsolutePath()));
+        }
+
+        // the restore copy will have same file name, but reside in a 
different directory
+        final File restoreFile = new File(restoreDirectory, config.getName());
+
+        // sync the primary copy with the restore copy
+        FileUtils.syncWithRestore(config, restoreFile, logger);
+
+    }
+
+    private void parseConfig(final File config) throws IOException {
+
+        // clear old information
+        subnetInfos.clear();
+        try (BufferedReader br = new BufferedReader(new FileReader(config))) {
+
+            String ipOrHostLine;
+            String ipCidr;
+            int totalIpsAdded = 0;
+            while ((ipOrHostLine = br.readLine()) != null) {
+
+                // cleanup whitespace
+                ipOrHostLine = ipOrHostLine.trim();
+
+                if (ipOrHostLine.isEmpty() || ipOrHostLine.startsWith("#")) {
+                    // skip empty lines or comments
+                    continue;
+                } else if (ipOrHostLine.contains("#")) {
+                    // parse out comments in IP containing lines
+                    ipOrHostLine = ipOrHostLine.substring(0, 
ipOrHostLine.indexOf("#")).trim();
+                }
+
+                // if given a complete IP, then covert to CIDR
+                if (ipOrHostLine.contains("/")) {
+                    ipCidr = ipOrHostLine;
+                } else if (ipOrHostLine.contains("\\")) {
+                    logger.warn("CIDR IP notation uses forward slashes '/'.  
Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'");
+                    ipCidr = ipOrHostLine.replace("\\", "/");
+                } else {
+                    try {
+                        ipCidr = 
InetAddress.getByName(ipOrHostLine).getHostAddress();
+                        if (!ipOrHostLine.equals(ipCidr)) {
+                            logger.debug(String.format("Resolved host '%s' to 
ip '%s'", ipOrHostLine, ipCidr));
+                        }
+                        ipCidr += "/32";
+                        logger.debug("Adding CIDR to exact IP: " + ipCidr);
+                    } catch (final UnknownHostException uhe) {
+                        logger.warn("Firewall is skipping unknown host 
address: " + ipOrHostLine);
+                        continue;
+                    }
+                }
+
+                try {
+                    logger.debug("Adding CIDR IP to firewall: " + ipCidr);
+                    final SubnetUtils subnetUtils = new SubnetUtils(ipCidr);
+                    subnetUtils.setInclusiveHostCount(true);
+                    subnetInfos.add(subnetUtils.getInfo());
+                    totalIpsAdded++;
+                } catch (final IllegalArgumentException iae) {
+                    logger.warn("Firewall is skipping invalid CIDR address: " 
+ ipOrHostLine);
+                }
+
+            }
+
+            if (totalIpsAdded == 0) {
+                logger.info("No IPs added to firewall.  Firewall will accept 
all requests.");
+            } else {
+                logger.info(String.format("Added %d IP(s) to firewall.  Only 
requests originating from the configured IPs will be accepted.", 
totalIpsAdded));
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
new file mode 100644
index 0000000..eedb88f
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+
+/**
+ * A dataflow with additional information about the cluster.
+ *
+ * @author unattributed
+ */
+public class ClusterDataFlow {
+
+    private final StandardDataFlow dataFlow;
+
+    private final NodeIdentifier primaryNodeId;
+
+    public ClusterDataFlow(final StandardDataFlow dataFlow, final 
NodeIdentifier primaryNodeId) {
+        this.dataFlow = dataFlow;
+        this.primaryNodeId = primaryNodeId;
+    }
+
+    public NodeIdentifier getPrimaryNodeId() {
+        return primaryNodeId;
+    }
+
+    public StandardDataFlow getDataFlow() {
+        return dataFlow;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
new file mode 100644
index 0000000..6ff15a7
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow;
+
+/**
+ * A base exception for data access exceptions.
+ *
+ * @author unattributed
+ */
+public class DaoException extends RuntimeException {
+
+    public DaoException() {
+    }
+
+    public DaoException(String msg) {
+        super(msg);
+    }
+
+    public DaoException(Throwable cause) {
+        super(cause);
+    }
+
+    public DaoException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
new file mode 100644
index 0000000..a273704
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow;
+
+/**
+ * A data access object for loading and saving the flow managed by the cluster.
+ *
+ * @author unattributed
+ */
+public interface DataFlowDao {
+
+    /**
+     * Loads the cluster's dataflow.
+     *
+     * @return the dataflow or null if no dataflow exists
+     *
+     * @throws DaoException if the dataflow was unable to be loaded
+     */
+    ClusterDataFlow loadDataFlow() throws DaoException;
+
+    /**
+     * Saves the cluster's dataflow.
+     *
+     *
+     * @param dataFlow
+     * @throws DaoException if the dataflow was unable to be saved
+     */
+    void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException;
+
+    /**
+     * Sets the state of the dataflow. If the dataflow does not exist, then an
+     * exception is thrown.
+     *
+     * @param flowState the state of the dataflow
+     *
+     * @throws DaoException if the state was unable to be updated
+     */
+    void setPersistedFlowState(PersistedFlowState flowState) throws 
DaoException;
+
+    /**
+     * Gets the state of the dataflow.
+     *
+     * @return the state of the dataflow
+     *
+     * @throws DaoException if the state was unable to be retrieved
+     */
+    PersistedFlowState getPersistedFlowState() throws DaoException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
new file mode 100644
index 0000000..339d904
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow;
+
+import java.util.Set;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * A service for managing the cluster's flow. The service will attempt to keep
+ * the cluster's dataflow current while respecting the value of the configured
+ * retrieval delay.
+ *
+ * The eligible retrieval time is reset with the configured delay every time 
the
+ * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then
+ * the flow will not be retrieved.
+ *
+ * Clients must call start() and stop() to initialize and stop the instance.
+ *
+ * @author unattributed
+ */
+public interface DataFlowManagementService {
+
+    /**
+     * Starts the instance. Start may only be called if the instance is not
+     * running.
+     */
+    void start();
+
+    /**
+     * Stops the instance. Stop may only be called if the instance is running.
+     */
+    void stop();
+
+    /**
+     * @return true if the instance is started; false otherwise.
+     */
+    boolean isRunning();
+
+    /**
+     * Loads the dataflow.
+     *
+     * @return the dataflow or null if no dataflow exists
+     */
+    ClusterDataFlow loadDataFlow();
+
+    /**
+     * Updates the dataflow with the given primary node identifier.
+     *
+     * @param nodeId the node identifier
+     *
+     * @throws DaoException if the update failed
+     */
+    void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
+
+    /**
+     * Sets the state of the flow.
+     *
+     * @param flowState the state
+     *
+     * @see PersistedFlowState
+     */
+    void setPersistedFlowState(PersistedFlowState flowState);
+
+    /**
+     * @return the state of the flow
+     */
+    PersistedFlowState getPersistedFlowState();
+
+    /**
+     * @return true if the flow is current; false otherwise.
+     */
+    boolean isFlowCurrent();
+
+    /**
+     * Sets the node identifiers to use when attempting to retrieve the flow.
+     *
+     * @param nodeIds the node identifiers
+     */
+    void setNodeIds(Set<NodeIdentifier> nodeIds);
+
+    /**
+     * Returns the set of node identifiers the service is using to retrieve the
+     * flow.
+     *
+     * @return the set of node identifiers the service is using to retrieve the
+     * flow.
+     */
+    Set<NodeIdentifier> getNodeIds();
+
+    /**
+     * @return the retrieval delay in seconds
+     */
+    int getRetrievalDelaySeconds();
+
+    /**
+     * Sets the retrieval delay.
+     *
+     * @param delay the retrieval delay in seconds
+     */
+    void setRetrievalDelay(String delay);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
new file mode 100644
index 0000000..b3afc6e
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow;
+
+/**
+ * Represents the various state of a flow managed by the cluster.
+ *
+ * The semantics of the values are:
+ * <ul>
+ * <li> CURRENT - the flow is current </li>
+ * <li> STALE - the flow is not current, but is eligible to be updated. </li>
+ * <li> UNKNOWN - the flow is not current and is not eligible to be updated.
+ * </li>
+ * </ul>
+ *
+ * @author unattributed
+ */
+public enum PersistedFlowState {
+
+    CURRENT,
+    STALE,
+    UNKNOWN
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
new file mode 100644
index 0000000..ce5a08b
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow;
+
+/**
+ * Represents the exceptional case when a caller is requesting the current 
flow,
+ * but a current flow is not available.
+ *
+ * @author unattributed
+ */
+public class StaleFlowException extends RuntimeException {
+
+    public StaleFlowException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public StaleFlowException(String message) {
+        super(message);
+    }
+
+    public StaleFlowException(Throwable cause) {
+        super(cause);
+    }
+
+    public StaleFlowException() {
+    }
+
+}

Reply via email to