http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java
 
b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java
new file mode 100644
index 0000000..a6542c8
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.web.api.dto.UserGroupDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body 
of
+ * a request or response to or from the API. This particular entity holds a
+ * reference to a UserGroupDTO.
+ */
+@XmlRootElement(name = "userGroupEntity")
+public class UserGroupEntity extends Entity {
+
+    private UserGroupDTO userGroup;
+
+    /**
+     * The UserGroupDTO that is being serialized.
+     *
+     * @return The UserGroupDTO object
+     */
+    public UserGroupDTO getUserGroup() {
+        return userGroup;
+    }
+
+    public void setUserGroup(UserGroupDTO userGroup) {
+        this.userGroup = userGroup;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java
 
b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java
new file mode 100644
index 0000000..baffe15
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import java.util.List;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.web.api.dto.search.UserGroupSearchResultDTO;
+import org.apache.nifi.web.api.dto.search.UserSearchResultDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body 
of
+ * a request or response to or from the API. This particular entity holds a
+ * reference to UserSearchResultDTOs and UserGroupSearchResultDTOs.
+ */
+@XmlRootElement(name = "userSearchResultsEntity")
+public class UserSearchResultsEntity {
+
+    private List<UserSearchResultDTO> userResults;
+    private List<UserGroupSearchResultDTO> userGroupResults;
+
+    /**
+     * The user search results.
+     *
+     * @return
+     */
+    public List<UserSearchResultDTO> getUserResults() {
+        return userResults;
+    }
+
+    public void setUserResults(List<UserSearchResultDTO> userResults) {
+        this.userResults = userResults;
+    }
+
+    /**
+     * The user group search results.
+     *
+     * @return
+     */
+    public List<UserGroupSearchResultDTO> getUserGroupResults() {
+        return userGroupResults;
+    }
+
+    public void setUserGroupResults(List<UserGroupSearchResultDTO> 
userGroupResults) {
+        this.userGroupResults = userGroupResults;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java
 
b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java
new file mode 100644
index 0000000..2d11d1f
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import java.util.Collection;
+import java.util.Date;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.web.api.dto.UserDTO;
+import org.apache.nifi.web.api.dto.util.TimeAdapter;
+
+/**
+ * A serialized representation of this class can be placed in the entity body 
of
+ * a request or response to or from the API. This particular entity holds a
+ * reference to a collection of UserDTO.
+ */
+@XmlRootElement(name = "usersEntity")
+public class UsersEntity extends Entity {
+
+    private Collection<UserDTO> users;
+    private Date generated;
+
+    /**
+     * The collection of UserDTOs that are being serialized.
+     *
+     * @return The UserDTO object
+     */
+    public Collection<UserDTO> getUsers() {
+        return users;
+    }
+
+    public void setUsers(Collection<UserDTO> users) {
+        this.users = users;
+    }
+
+    /**
+     * When this content was generated.
+     *
+     * @return
+     */
+    @XmlJavaTypeAdapter(TimeAdapter.class)
+    public Date getGenerated() {
+        return generated;
+    }
+
+    public void setGenerated(Date generated) {
+        this.generated = generated;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml
new file mode 100644
index 0000000..b8960c3
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-framework-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>cluster-authorization-provider</artifactId>
+    <name>NiFi Framework Cluster Authority Provider</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>file-authorization-provider</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>framework-cluster-protocol</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>framework-cluster</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-socket-utils</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java
new file mode 100644
index 0000000..2b3b38c
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import org.apache.nifi.authorization.AuthorityProvider;
+import org.apache.nifi.authorization.AuthorityProviderConfigurationContext;
+import org.apache.nifi.authorization.AuthorityProviderInitializationContext;
+import org.apache.nifi.authorization.FileAuthorizationProvider;
+import org.apache.nifi.authorization.annotation.AuthorityProviderContext;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.authorization.exception.ProviderDestructionException;
+import 
org.apache.nifi.cluster.authorization.protocol.message.DoesDnExistMessage;
+import 
org.apache.nifi.cluster.authorization.protocol.message.GetAuthoritiesMessage;
+import 
org.apache.nifi.cluster.authorization.protocol.message.GetGroupForUserMessage;
+import org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage;
+import static 
org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType.DOES_DN_EXIST;
+import static 
org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType.GET_AUTHORITIES;
+import static 
org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType.GET_GROUP_FOR_USER;
+import 
org.apache.nifi.cluster.authorization.protocol.message.jaxb.JaxbProtocolUtils;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketListener;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.util.NiFiProperties;
+import static org.apache.nifi.util.NiFiProperties.CLUSTER_MANAGER_ADDRESS;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Provides authorities for the NCM in clustered environments. Communication
+ * occurs over TCP/IP sockets. All method calls are deferred to the
+ * FileAuthorizationProvider.
+ */
+public class ClusterManagerAuthorizationProvider extends 
FileAuthorizationProvider implements AuthorityProvider, ApplicationContextAware 
{
+
+    public static final String AUTHORITY_PROVIDER_SERVIVE_NAME = 
"cluster-authority-provider";
+
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(ClusterManagerAuthorizationProvider.class));
+    private static final String CLUSTER_MANAGER_AUTHORITY_PROVIDER_PORT = 
"Authority Provider Port";
+    private static final String CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS = 
"Authority Provider Threads";
+    private static final int 
DEFAULT_CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS = 10;
+
+    private WebClusterManager clusterManager;
+    private ProtocolContext<ProtocolMessage> authorityProviderProtocolContext;
+    private SocketListener socketListener;
+    private NiFiProperties properties;
+    private ApplicationContext applicationContext;
+
+    @Override
+    public void initialize(final AuthorityProviderInitializationContext 
initializationContext) throws ProviderCreationException {
+        super.initialize(initializationContext);
+    }
+
+    @Override
+    public void onConfigured(final AuthorityProviderConfigurationContext 
configurationContext) throws ProviderCreationException {
+        super.onConfigured(configurationContext);
+
+        // get the socket address of the cluster authority provider
+        final InetSocketAddress clusterAuthorityProviderAddress = 
getClusterManagerAuthorityProviderAddress(configurationContext);
+
+        // get the cluster manager
+        clusterManager = applicationContext.getBean("clusterManager", 
WebClusterManager.class);
+
+        // if using multicast, then the authority provider's service is 
broadcasted
+        if (properties.getClusterProtocolUseMulticast()) {
+
+            // create the authority provider service for discovery
+            final DiscoverableService clusterAuthorityProviderService = new 
DiscoverableServiceImpl(AUTHORITY_PROVIDER_SERVIVE_NAME, 
clusterAuthorityProviderAddress);
+
+            // register the authority provider service with the cluster manager
+            
clusterManager.addBroadcastedService(clusterAuthorityProviderService);
+        }
+
+        // get the number of protocol listening thread
+        final int numThreads = 
getClusterManagerAuthorityProviderThreads(configurationContext);
+
+        // the server socket configuration
+        final ServerSocketConfiguration configuration = 
applicationContext.getBean("protocolServerSocketConfiguration", 
ServerSocketConfiguration.class);
+
+        // the authority provider listens for node messages
+        socketListener = new SocketListener(numThreads, 
clusterAuthorityProviderAddress.getPort(), configuration) {
+            @Override
+            public void dispatchRequest(final Socket socket) {
+                
ClusterManagerAuthorizationProvider.this.dispatchRequest(socket);
+            }
+        };
+
+        // start the socket listener
+        if (socketListener != null && !socketListener.isRunning()) {
+            try {
+                socketListener.start();
+            } catch (final IOException ioe) {
+                throw new ProviderCreationException("Failed to start Cluster 
Manager Authorization Provider due to: " + ioe, ioe);
+            }
+        }
+
+        // initialize the protocol context
+        authorityProviderProtocolContext = new 
JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT);
+    }
+
+    @Override
+    public void preDestruction() throws ProviderDestructionException {
+        if (socketListener != null && socketListener.isRunning()) {
+            try {
+                socketListener.stop();
+            } catch (final IOException ioe) {
+                throw new ProviderDestructionException("Failed to stop Cluster 
Manager Authorization Provider due to: " + ioe, ioe);
+            }
+        }
+        super.preDestruction();
+    }
+
+    private int getClusterManagerAuthorityProviderThreads(final 
AuthorityProviderConfigurationContext configurationContext) {
+        try {
+            return 
Integer.parseInt(configurationContext.getProperty(CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS));
+        } catch (NumberFormatException nfe) {
+            return DEFAULT_CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS;
+        }
+    }
+
+    private InetSocketAddress getClusterManagerAuthorityProviderAddress(final 
AuthorityProviderConfigurationContext configurationContext) {
+        try {
+            String socketAddress = 
properties.getProperty(CLUSTER_MANAGER_ADDRESS);
+            if (StringUtils.isBlank(socketAddress)) {
+                socketAddress = "localhost";
+            }
+            return InetSocketAddress.createUnresolved(socketAddress, 
getClusterManagerAuthorityProviderPort(configurationContext));
+        } catch (Exception ex) {
+            throw new RuntimeException("Invalid manager authority provider 
address/port due to: " + ex, ex);
+        }
+    }
+
+    private Integer getClusterManagerAuthorityProviderPort(final 
AuthorityProviderConfigurationContext configurationContext) {
+        final String authorityProviderPort = 
configurationContext.getProperty(CLUSTER_MANAGER_AUTHORITY_PROVIDER_PORT);
+        if (authorityProviderPort == null || 
authorityProviderPort.trim().isEmpty()) {
+            throw new ProviderCreationException("The authority provider port 
must be specified.");
+        }
+
+        return Integer.parseInt(authorityProviderPort);
+    }
+
+    private void dispatchRequest(final Socket socket) {
+        try {
+            // unmarshall message
+            final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = 
authorityProviderProtocolContext.createUnmarshaller();
+            final ProtocolMessage request = 
unmarshaller.unmarshal(socket.getInputStream());
+            final ProtocolMessage response = request;
+
+            try {
+                switch (request.getType()) {
+                    case DOES_DN_EXIST: {
+                        final DoesDnExistMessage castedMsg = 
(DoesDnExistMessage) request;
+                        castedMsg.setResponse(doesDnExist(castedMsg.getDn()));
+                        break;
+                    }
+                    case GET_AUTHORITIES: {
+                        final GetAuthoritiesMessage castedMsg = 
(GetAuthoritiesMessage) request;
+                        
castedMsg.setResponse(getAuthorities(castedMsg.getDn()));
+                        break;
+                    }
+                    case GET_GROUP_FOR_USER: {
+                        final GetGroupForUserMessage castedMsg = 
(GetGroupForUserMessage) request;
+                        
castedMsg.setResponse(getGroupForUser(castedMsg.getDn()));
+                        break;
+                    }
+                    default: {
+                        throw new Exception("Unsupported Message Type: " + 
request.getType());
+                    }
+                }
+            } catch (final Exception ex) {
+                response.setExceptionClass(ex.getClass().getName());
+                response.setExceptionMessage(ex.getMessage());
+            }
+
+            final ProtocolMessageMarshaller<ProtocolMessage> marshaller = 
authorityProviderProtocolContext.createMarshaller();
+            marshaller.marshal(response, socket.getOutputStream());
+
+        } catch (final Exception e) {
+            logger.warn("Failed processing Socket Authorization Provider 
protocol message due to " + e, e);
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    @Override
+    @AuthorityProviderContext
+    public void setApplicationContext(final ApplicationContext 
applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    @Override
+    @AuthorityProviderContext
+    public void setNiFiProperties(NiFiProperties properties) {
+        super.setNiFiProperties(properties);
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java
new file mode 100644
index 0000000..c81e9d0
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization;
+
+import 
org.apache.nifi.cluster.authorization.protocol.message.DoesDnExistMessage;
+import 
org.apache.nifi.cluster.authorization.protocol.message.GetAuthoritiesMessage;
+import org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.authorization.Authority;
+import org.apache.nifi.authorization.AuthorityProvider;
+import org.apache.nifi.authorization.AuthorityProviderConfigurationContext;
+import org.apache.nifi.authorization.AuthorityProviderInitializationContext;
+import org.apache.nifi.authorization.annotation.AuthorityProviderContext;
+import org.apache.nifi.authorization.exception.AuthorityAccessException;
+import org.apache.nifi.authorization.exception.IdentityAlreadyExistsException;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.authorization.exception.ProviderDestructionException;
+import org.apache.nifi.authorization.exception.UnknownIdentityException;
+import 
org.apache.nifi.cluster.authorization.protocol.message.GetGroupForUserMessage;
+import 
org.apache.nifi.cluster.authorization.protocol.message.jaxb.JaxbProtocolUtils;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.util.NiFiProperties;
+import static 
org.apache.nifi.util.NiFiProperties.CLUSTER_NODE_UNICAST_MANAGER_ADDRESS;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ * Provides authorities for nodes in clustered environments. Communication
+ * occurs over TCP/IP sockets. All method calls are communicated to the cluster
+ * manager provider via socket.
+ */
+public class NodeAuthorizationProvider implements AuthorityProvider, 
ApplicationContextAware {
+
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(NodeAuthorizationProvider.class));
+    private static final String CLUSTER_NODE_MANAGER_AUTHORITY_PROVIDER_PORT = 
"Cluster Manager Authority Provider Port";
+
+    private ProtocolContext<ProtocolMessage> authorityProviderProtocolContext;
+    private SocketConfiguration socketConfiguration;
+    private ClusterServiceLocator serviceLocator;
+    private ApplicationContext applicationContext;
+    private NiFiProperties properties;
+
+    @Override
+    public void initialize(AuthorityProviderInitializationContext 
initializationContext) throws ProviderCreationException {
+    }
+
+    @Override
+    public void onConfigured(final AuthorityProviderConfigurationContext 
configurationContext) throws ProviderCreationException {
+        // TODO clear user cache?
+
+        // if using multicast, then the authority provider's service is 
broadcasted
+        if (properties.getClusterProtocolUseMulticast()) {
+            // create the service discovery
+            final ClusterServiceDiscovery serviceDiscovery = new 
ClusterServiceDiscovery(
+                    
ClusterManagerAuthorizationProvider.AUTHORITY_PROVIDER_SERVIVE_NAME,
+                    properties.getClusterProtocolMulticastAddress(),
+                    
applicationContext.getBean("protocolMulticastConfiguration", 
MulticastConfiguration.class),
+                    applicationContext.getBean("protocolContext", 
ProtocolContext.class));
+
+            // create service location configuration
+            final ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
+            config.setNumAttempts(3);
+            config.setTimeBetweenAttempts(1);
+            config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+
+            serviceLocator = new ClusterServiceLocator(serviceDiscovery);
+            serviceLocator.setAttemptsConfig(config);
+        } else {
+            final InetSocketAddress serviceAddress = 
getClusterNodeManagerAuthorityProviderAddress(configurationContext);
+            final DiscoverableService service = new 
DiscoverableServiceImpl(ClusterManagerAuthorizationProvider.AUTHORITY_PROVIDER_SERVIVE_NAME,
 serviceAddress);
+            serviceLocator = new ClusterServiceLocator(service);
+        }
+
+        try {
+            // start the service locator
+            serviceLocator.start();
+        } catch (final IOException ioe) {
+            throw new ProviderCreationException(ioe);
+        }
+
+        // the socket configuration
+        socketConfiguration = 
applicationContext.getBean("protocolSocketConfiguration", 
SocketConfiguration.class);
+
+        // initialize the protocol context
+        authorityProviderProtocolContext = new 
JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT);
+    }
+
+    private InetSocketAddress 
getClusterNodeManagerAuthorityProviderAddress(final 
AuthorityProviderConfigurationContext configurationContext) {
+        try {
+            String socketAddress = 
properties.getProperty(CLUSTER_NODE_UNICAST_MANAGER_ADDRESS);
+            if (StringUtils.isBlank(socketAddress)) {
+                socketAddress = "localhost";
+            }
+            return InetSocketAddress.createUnresolved(socketAddress, 
getClusterNodeManagerAuthorityProviderPort(configurationContext));
+        } catch (Exception ex) {
+            throw new ProviderCreationException("Invalid cluster manager 
authority provider address/port due to: " + ex, ex);
+        }
+    }
+
+    private Integer getClusterNodeManagerAuthorityProviderPort(final 
AuthorityProviderConfigurationContext configurationContext) {
+        final String nodeAuthorityProviderPort = 
configurationContext.getProperty(CLUSTER_NODE_MANAGER_AUTHORITY_PROVIDER_PORT);
+        if (nodeAuthorityProviderPort == null || 
nodeAuthorityProviderPort.trim().isEmpty()) {
+            throw new ProviderCreationException("The cluster manager authority 
provider port must be specified.");
+        }
+
+        return Integer.parseInt(nodeAuthorityProviderPort);
+    }
+
+    @Override
+    public void setAuthorities(String dn, Set<Authority> authorities) throws 
AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to set user 
authorities.");
+    }
+
+    @Override
+    public void addUser(String dn, String group) throws 
IdentityAlreadyExistsException, AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to add 
users.");
+    }
+
+    @Override
+    public boolean doesDnExist(String dn) throws AuthorityAccessException {
+        // create message
+        final DoesDnExistMessage msg = new DoesDnExistMessage();
+        msg.setDn(dn);
+
+        Socket socket = null;
+        try {
+
+            final InetSocketAddress socketAddress = getServiceAddress();
+            if (socketAddress == null) {
+                throw new AuthorityAccessException("Cluster Authority 
Provider's address is not known.");
+            }
+
+            try {
+                // create a socket
+                socket = SocketUtils.createSocket(socketAddress, 
socketConfiguration);
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed to create socket 
due to: " + ioe, ioe);
+            }
+
+            try {
+                // marshal message to output stream
+                final ProtocolMessageMarshaller marshaller = 
authorityProviderProtocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed marshalling '" + 
msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+
+            try {
+
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> 
unmarshaller = authorityProviderProtocolContext.createUnmarshaller();
+                final DoesDnExistMessage response = (DoesDnExistMessage) 
unmarshaller.unmarshal(socket.getInputStream());
+
+                // check if there was an exception
+                if (response.wasException()) {
+                    throw new 
AuthorityAccessException(response.getExceptionMessage());
+                }
+
+                // return provider's response
+                return response.getResponse();
+
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed unmarshalling '" + 
msg.getType() + "' response protocol message due to: " + ioe, ioe);
+            }
+
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    @Override
+    public Set<Authority> getAuthorities(String dn) throws 
UnknownIdentityException, AuthorityAccessException {
+        // create message
+        final GetAuthoritiesMessage msg = new GetAuthoritiesMessage();
+        msg.setDn(dn);
+
+        Socket socket = null;
+        try {
+
+            final InetSocketAddress socketAddress = getServiceAddress();
+            if (socketAddress == null) {
+                throw new AuthorityAccessException("Cluster Authority 
Provider's address is not known.");
+            }
+
+            try {
+                // create a socket
+                socket = SocketUtils.createSocket(socketAddress, 
socketConfiguration);
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed to create socket 
due to: " + ioe, ioe);
+            }
+
+            try {
+                // marshal message to output stream
+                final ProtocolMessageMarshaller marshaller = 
authorityProviderProtocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed marshalling '" + 
msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+
+            try {
+
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> 
unmarshaller = authorityProviderProtocolContext.createUnmarshaller();
+                final GetAuthoritiesMessage response = (GetAuthoritiesMessage) 
unmarshaller.unmarshal(socket.getInputStream());
+
+                // check if there was an exception
+                if (response.wasException()) {
+                    if (isException(UnknownIdentityException.class, response)) 
{
+                        throw new 
UnknownIdentityException(response.getExceptionMessage());
+                    } else {
+                        throw new 
AuthorityAccessException(response.getExceptionMessage());
+                    }
+                }
+
+                // return provider's response
+                return response.getResponse();
+
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed unmarshalling '" + 
msg.getType() + "' response protocol message due to: " + ioe, ioe);
+            }
+
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    @Override
+    public Set<String> getUsers(Authority authority) throws 
AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to get users 
for a given authority.");
+    }
+
+    @Override
+    public void revokeUser(String dn) throws UnknownIdentityException, 
AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to revoke 
users.");
+    }
+
+    @Override
+    public void setUsersGroup(Set<String> dns, String group) throws 
UnknownIdentityException, AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to set user 
groups.");
+    }
+
+    @Override
+    public void ungroupUser(String dn) throws UnknownIdentityException, 
AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to ungroup 
users.");
+    }
+
+    @Override
+    public void ungroup(String group) throws AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to 
ungroup.");
+    }
+
+    @Override
+    public String getGroupForUser(String dn) throws UnknownIdentityException, 
AuthorityAccessException {
+        // create message
+        final GetGroupForUserMessage msg = new GetGroupForUserMessage();
+        msg.setDn(dn);
+
+        Socket socket = null;
+        try {
+
+            final InetSocketAddress socketAddress = getServiceAddress();
+            if (socketAddress == null) {
+                throw new AuthorityAccessException("Cluster Authority 
Provider's address is not known.");
+            }
+
+            try {
+                // create a socket
+                socket = SocketUtils.createSocket(socketAddress, 
socketConfiguration);
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed to create socket 
due to: " + ioe, ioe);
+            }
+
+            try {
+                // marshal message to output stream
+                final ProtocolMessageMarshaller marshaller = 
authorityProviderProtocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed marshalling '" + 
msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+
+            try {
+
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> 
unmarshaller = authorityProviderProtocolContext.createUnmarshaller();
+                final GetGroupForUserMessage response = 
(GetGroupForUserMessage) unmarshaller.unmarshal(socket.getInputStream());
+
+                // check if there was an exception
+                if (response.wasException()) {
+                    if (isException(UnknownIdentityException.class, response)) 
{
+                        throw new 
UnknownIdentityException(response.getExceptionMessage());
+                    } else {
+                        throw new 
AuthorityAccessException(response.getExceptionMessage());
+                    }
+                }
+
+                return response.getResponse();
+            } catch (final IOException ioe) {
+                throw new AuthorityAccessException("Failed unmarshalling '" + 
msg.getType() + "' response protocol message due to: " + ioe, ioe);
+            }
+
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    @Override
+    public void revokeGroup(String group) throws UnknownIdentityException, 
AuthorityAccessException {
+        throw new AuthorityAccessException("Nodes are not allowed to revoke 
groups.");
+    }
+
+    @Override
+    public void preDestruction() throws ProviderDestructionException {
+        try {
+            if (serviceLocator != null && serviceLocator.isRunning()) {
+                serviceLocator.stop();
+            }
+        } catch (final IOException ioe) {
+            throw new ProviderDestructionException(ioe);
+        }
+    }
+
+    @Override
+    @AuthorityProviderContext
+    public void setApplicationContext(final ApplicationContext 
applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    @AuthorityProviderContext
+    public void setNiFiProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+
+    private InetSocketAddress getServiceAddress() {
+        final DiscoverableService service = serviceLocator.getService();
+        if (service != null) {
+            return service.getServiceAddress();
+        }
+        return null;
+    }
+
+    private boolean isException(final Class<? extends Exception> exception, 
final ProtocolMessage protocolMessage) {
+        if (protocolMessage.wasException()) {
+            return 
exception.getName().equals(protocolMessage.getExceptionClass());
+        } else {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java
new file mode 100644
index 0000000..38d0dd8
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import 
org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "doesDnExistMessage")
+public class DoesDnExistMessage extends ProtocolMessage {
+
+    private String dn;
+
+    private boolean response;
+
+    public DoesDnExistMessage() {
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.DOES_DN_EXIST;
+    }
+
+    public String getDn() {
+        return dn;
+    }
+
+    public void setDn(String dn) {
+        this.dn = dn;
+    }
+
+    public boolean getResponse() {
+        return response;
+    }
+
+    public void setResponse(boolean response) {
+        this.response = response;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java
new file mode 100644
index 0000000..347163f
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization.protocol.message;
+
+import java.util.HashSet;
+import java.util.Set;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.authorization.Authority;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "getAuthoritiesMessage")
+public class GetAuthoritiesMessage extends ProtocolMessage {
+
+    private String dn;
+
+    private Set<Authority> response = new HashSet<>();
+
+    public GetAuthoritiesMessage() {
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.GET_AUTHORITIES;
+    }
+
+    public String getDn() {
+        return dn;
+    }
+
+    public void setDn(String dn) {
+        this.dn = dn;
+    }
+
+    public Set<Authority> getResponse() {
+        return response;
+    }
+
+    public void setResponse(Set<Authority> response) {
+        this.response = response;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java
new file mode 100644
index 0000000..717f244
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "getGroupForUserMessage")
+public class GetGroupForUserMessage extends ProtocolMessage {
+
+    private String dn;
+
+    private String response;
+
+    public GetGroupForUserMessage() {
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.GET_GROUP_FOR_USER;
+    }
+
+    public String getDn() {
+        return dn;
+    }
+
+    public void setDn(String dn) {
+        this.dn = dn;
+    }
+
+    public String getResponse() {
+        return response;
+    }
+
+    public void setResponse(String response) {
+        this.response = response;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java
new file mode 100644
index 0000000..102142a
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization.protocol.message;
+
+/**
+ * @author unattributed
+ */
+public abstract class ProtocolMessage {
+
+    private String exceptionClass;
+    private String exceptionMessage;
+
+    public static enum MessageType {
+
+        DOES_DN_EXIST,
+        GET_AUTHORITIES,
+        GET_USERS,
+        GET_GROUP_FOR_USER
+    }
+
+    public abstract MessageType getType();
+
+    public boolean wasException() {
+        return exceptionClass != null;
+    }
+
+    public String getExceptionMessage() {
+        return exceptionMessage;
+    }
+
+    public void setExceptionMessage(final String exceptionMessage) {
+        this.exceptionMessage = exceptionMessage;
+    }
+
+    public String getExceptionClass() {
+        return exceptionClass;
+    }
+
+    public void setExceptionClass(String exceptionClass) {
+        this.exceptionClass = exceptionClass;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java
new file mode 100644
index 0000000..97a1bc7
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization.protocol.message.jaxb;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+
+/**
+ * @author unattributed
+ */
+public final class JaxbProtocolUtils {
+
+    public static final String JAXB_CONTEXT_PATH = 
ObjectFactory.class.getPackage().getName();
+
+    public static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+    /**
+     * Load the JAXBContext version.
+     */
+    private static JAXBContext initializeJaxbContext() {
+        try {
+            return JAXBContext.newInstance(JAXB_CONTEXT_PATH);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java
new file mode 100644
index 0000000..5cde335
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.authorization.protocol.message.jaxb;
+
+import javax.xml.bind.annotation.XmlRegistry;
+import 
org.apache.nifi.cluster.authorization.protocol.message.DoesDnExistMessage;
+import 
org.apache.nifi.cluster.authorization.protocol.message.GetAuthoritiesMessage;
+import 
org.apache.nifi.cluster.authorization.protocol.message.GetGroupForUserMessage;
+
+/**
+ * @author unattributed
+ */
+@XmlRegistry
+public class ObjectFactory {
+
+    public ObjectFactory() {
+    }
+
+    public DoesDnExistMessage createDoesDnExistMessage() {
+        return new DoesDnExistMessage();
+    }
+
+    public GetAuthoritiesMessage createGetAuthoritiesMessage() {
+        return new GetAuthoritiesMessage();
+    }
+
+    public GetGroupForUserMessage createGetGroupForUserMessage() {
+        return new GetGroupForUserMessage();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
new file mode 100644
index 0000000..56f4c3e
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.cluster.authorization.ClusterManagerAuthorizationProvider
+org.apache.nifi.cluster.authorization.NodeAuthorizationProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore 
b/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml 
b/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml
new file mode 100644
index 0000000..5351085
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-framework-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>framework-cluster-protocol</artifactId>
+    <packaging>jar</packaging>
+    <name>NiFi Framework Cluster Protocol</name>
+    <description>The messaging protocol for clustered NiFi</description>
+    <dependencies>
+        
+        <!-- application dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-logging-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-socket-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>core-api</artifactId>
+        </dependency>
+        
+        <!-- spring dependencies -->
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-beans</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
new file mode 100644
index 0000000..fa1547f
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
+import org.apache.nifi.reporting.BulletinRepository;
+
+/**
+ * An interface for sending protocol messages from the cluster manager to 
nodes.
+ * 
+ * @author unattributed
+ */
+public interface ClusterManagerProtocolSender {
+    
+    /**
+     * Sends a "flow request" message to a node.
+     * @param msg a message
+     * @return the response
+     * @throws ProtocolException if communication failed 
+     */
+    FlowResponseMessage requestFlow(FlowRequestMessage msg) throws 
ProtocolException;
+
+    /**
+     * Sends a "reconnection request" message to a node.
+     * @param msg a message
+     * @return 
+     * @throws ProtocolException if communication failed
+     */
+    ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage 
msg) throws ProtocolException;
+    
+    /**
+     * Sends a "disconnection request" message to a node.
+     * @param msg a message
+     * @throws ProtocolException if communication failed
+     */
+    void disconnect(DisconnectMessage msg) throws ProtocolException;
+    
+    /**
+     * Sends an "assign primary role" message to a node.
+     * @param msg a message
+     * @throws ProtocolException if communication failed 
+     */
+    void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws 
ProtocolException;
+
+    /**
+     * Sets the {@link BulletinRepository} that can be used to report bulletins
+     * @param bulletinRepository
+     */
+    void setBulletinRepository(final BulletinRepository bulletinRepository);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
new file mode 100644
index 0000000..1b5d007
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter;
+
+/**
+ * A node's request to connect to the cluster.  The request contains a proposed
+ * identifier.
+ * 
+ * @author unattributed
+ */
+@XmlJavaTypeAdapter(ConnectionRequestAdapter.class)
+public class ConnectionRequest {
+
+    private final NodeIdentifier proposedNodeIdentifier;
+
+    public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) {
+        if(proposedNodeIdentifier == null) {
+            throw new IllegalArgumentException("Proposed node identifier may 
not be null.");
+        }
+        this.proposedNodeIdentifier = proposedNodeIdentifier;
+    }
+
+    public NodeIdentifier getProposedNodeIdentifier() {
+        return proposedNodeIdentifier;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
new file mode 100644
index 0000000..7a5ff2b
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter;
+
+/**
+ * The cluster manager's response to a node's connection request.  If the 
manager
+ * has a current copy of the data flow, then it is returned with a node 
identifier
+ * to the node.  Otherwise, the manager will provide a "try again in X 
seconds" 
+ * response to the node in hopes that a current data flow will be available 
upon
+ * subsequent requests.
+ * 
+ * @author unattributed
+ */
+@XmlJavaTypeAdapter(ConnectionResponseAdapter.class)
+public class ConnectionResponse {
+
+    private final boolean blockedByFirewall;
+    private final int tryLaterSeconds;
+    private final NodeIdentifier nodeIdentifier;
+    private final StandardDataFlow dataFlow;
+    private final boolean primary;
+    private final Integer managerRemoteInputPort;
+    private final Boolean managerRemoteCommsSecure;
+    private final String instanceId;
+    
+    private volatile String clusterManagerDN;
+    
+    public ConnectionResponse(final NodeIdentifier nodeIdentifier, final 
StandardDataFlow dataFlow, final boolean primary, 
+        final Integer managerRemoteInputPort, final Boolean 
managerRemoteCommsSecure, final String instanceId) {
+        if(nodeIdentifier == null) {
+            throw new IllegalArgumentException("Node identifier may not be 
empty or null.");
+        } else if(dataFlow == null) {
+            throw new IllegalArgumentException("DataFlow may not be null.");
+        }
+        this.nodeIdentifier = nodeIdentifier;
+        this.dataFlow = dataFlow;
+        this.tryLaterSeconds = 0;
+        this.blockedByFirewall = false;
+        this.primary = primary;
+        this.managerRemoteInputPort = managerRemoteInputPort;
+        this.managerRemoteCommsSecure = managerRemoteCommsSecure;
+        this.instanceId = instanceId;
+    }
+    
+    public ConnectionResponse(final int tryLaterSeconds) {
+        if(tryLaterSeconds <= 0) {
+            throw new IllegalArgumentException("Try-Later seconds may not be 
nonnegative: " + tryLaterSeconds);
+        }
+        this.dataFlow = null;
+        this.nodeIdentifier = null;
+        this.tryLaterSeconds = tryLaterSeconds;
+        this.blockedByFirewall = false;
+        this.primary = false;
+        this.managerRemoteInputPort = null;
+        this.managerRemoteCommsSecure = null;
+        this.instanceId = null;
+    }
+
+    private ConnectionResponse() {
+        this.dataFlow = null;
+        this.nodeIdentifier = null;
+        this.tryLaterSeconds = 0;
+        this.blockedByFirewall = true;
+        this.primary = false;
+        this.managerRemoteInputPort = null;
+        this.managerRemoteCommsSecure = null;
+        this.instanceId = null;
+    }
+    
+    public static ConnectionResponse createBlockedByFirewallResponse() {
+        return new ConnectionResponse();
+    }
+    
+    public boolean isPrimary() {
+        return primary;
+    }
+    
+    public boolean shouldTryLater() {
+        return tryLaterSeconds > 0;
+    }
+    
+    public boolean isBlockedByFirewall() {
+        return blockedByFirewall;
+    }
+
+    public int getTryLaterSeconds() {
+        return tryLaterSeconds;
+    }
+    
+    public StandardDataFlow getDataFlow() {
+        return dataFlow;
+    }
+    
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeIdentifier;
+    }
+
+    public Integer getManagerRemoteInputPort() {
+        return managerRemoteInputPort;
+    }
+    
+    public Boolean isManagerRemoteCommsSecure() {
+        return managerRemoteCommsSecure;
+    }
+    
+    public String getInstanceId() {
+        return instanceId;
+    }
+    
+    public void setClusterManagerDN(final String dn) {
+        this.clusterManagerDN = dn;
+    }
+    
+    /**
+     * Returns the DN of the NCM, if it is available or <code>null</code> 
otherwise.
+     * 
+     * @return
+     */
+    public String getClusterManagerDN() {
+        return clusterManagerDN;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
new file mode 100644
index 0000000..67324a1
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import java.util.Date;
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
+
+/**
+ * A heartbeat for indicating the status of a node to the cluster.
+ * @author unattributed
+ */
+@XmlJavaTypeAdapter(HeartbeatAdapter.class)
+public class Heartbeat {
+    
+    private final NodeIdentifier nodeIdentifier;
+    private final boolean primary;
+    private final boolean connected;
+    private final long createdTimestamp;
+    private final byte[] payload;
+    
+    public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean 
primary, final boolean connected, final byte[] payload) {
+        if(nodeIdentifier == null) {
+            throw new IllegalArgumentException("Node Identifier may not be 
null.");
+        } 
+        this.nodeIdentifier = nodeIdentifier;
+        this.primary = primary;
+        this.connected = connected;
+        this.payload = payload;
+        this.createdTimestamp = new Date().getTime();
+    }
+    
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeIdentifier;
+    }
+    
+    public byte[] getPayload() {
+        return payload;
+    }
+    
+    public boolean isPrimary() {
+        return primary;
+    }
+    
+    public boolean isConnected() {
+        return connected;
+    }
+    
+    @XmlTransient
+    public long getCreatedTimestamp() {
+        return createdTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
new file mode 100644
index 0000000..a120524
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeBulletinsAdapter;
+
+/**
+ *
+ */
+@XmlJavaTypeAdapter(NodeBulletinsAdapter.class)
+public class NodeBulletins {
+
+    private final NodeIdentifier nodeIdentifier;
+    private final byte[] payload;
+
+    public NodeBulletins(NodeIdentifier nodeIdentifier, byte[] payload) {
+        this.nodeIdentifier = nodeIdentifier;
+        this.payload = payload;
+    }
+
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeIdentifier;
+    }
+
+    public byte[] getPayload() {
+        return payload;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
new file mode 100644
index 0000000..1893186
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * A node identifier denoting the coordinates of a flow controller that is 
connected 
+ * to a cluster.  Nodes provide an external public API interface and an 
internal private
+ * interface for communicating with the cluster.
+ * 
+ * The external API interface and internal protocol each require an IP or 
hostname 
+ * as well as a port for communicating. 
+ * 
+ * This class overrides hashCode and equals and considers two instances to be
+ * equal if they have the equal IDs.
+ * 
+ * @author unattributed
+ * @Immutable
+ * @Threadsafe
+ */
+public class NodeIdentifier {
+ 
+    /** the unique identifier for the node */
+    private final String id;
+    
+    /** the IP or hostname to use for sending requests to the node's external 
interface */
+    private final String apiAddress;
+    
+    /** the port to use use for sending requests to the node's external 
interface */
+    private final int apiPort;    
+    
+    /** the IP or hostname to use for sending requests to the node's internal 
interface */
+    private final String socketAddress;
+    
+    /** the port to use use for sending requests to the node's internal 
interface */
+    private final int socketPort;
+    
+    private final String nodeDn;
+
+    public NodeIdentifier(final String id, final String apiAddress, final int 
apiPort, final String socketAddress, final int socketPort) {
+        this(id, apiAddress, apiPort, socketAddress, socketPort, null);
+    }
+    
+    public NodeIdentifier(final String id, final String apiAddress, final int 
apiPort, final String socketAddress, final int socketPort, final String dn) {
+        
+        if(StringUtils.isBlank(id)) {
+            throw new IllegalArgumentException("Node ID may not be empty or 
null.");
+        } else if(StringUtils.isBlank(apiAddress)) {
+            throw new IllegalArgumentException("Node API address may not be 
empty or null.");
+        } else if(StringUtils.isBlank(socketAddress)) {
+            throw new IllegalArgumentException("Node socket address may not be 
empty or null.");
+        } 
+        
+        validatePort(apiPort);
+        validatePort(socketPort);
+        
+        this.id = id;
+        this.apiAddress = apiAddress;
+        this.apiPort = apiPort;
+        this.socketAddress = socketAddress;
+        this.socketPort = socketPort;
+        this.nodeDn = dn;
+    }
+
+    public String getId() {
+        return id;
+    }
+    
+    public String getDN() {
+        return nodeDn;
+    }
+    
+    public String getApiAddress() {
+        return apiAddress;
+    }
+
+    public int getApiPort() {
+        return apiPort;
+    }
+
+    public String getSocketAddress() {
+        return socketAddress;
+    }
+    
+    public int getSocketPort() {
+        return socketPort;
+    }
+    
+    private void validatePort(final int port) {
+        if(port < 1 || port > 65535) {
+            throw new IllegalArgumentException("Port must be inclusively in 
the range [1, 65535].  Port given: " + port);
+        }   
+    }
+    
+    /**
+     * Compares the id of two node identifiers for equality.
+     * 
+     * @param obj a node identifier
+     * 
+     * @return true if the id is equal; false otherwise
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final NodeIdentifier other = (NodeIdentifier) obj;
+        if ((this.id == null) ? (other.id != null) : 
!this.id.equals(other.id)) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Compares API address/port and socket address/port for equality.  The 
+     * id is not used for comparison.
+     * 
+     * @param other a node identifier
+     * 
+     * @return true if API address/port and socket address/port are equal; 
false
+     * otherwise
+     */
+    public boolean logicallyEquals(final NodeIdentifier other) {
+        if(other == null) {
+            return false;
+        }
+        if ((this.apiAddress == null) ? (other.apiAddress != null) : 
!this.apiAddress.equals(other.apiAddress)) {
+            return false;
+        }
+        if(this.apiPort != other.apiPort) {
+            return false;
+        }
+        if ((this.socketAddress == null) ? (other.socketAddress != null) : 
!this.socketAddress.equals(other.socketAddress)) {
+            return false;
+        }
+        if(this.socketPort != other.socketPort) {
+            return false;
+        }
+        return true;
+    }
+    
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 31 * hash + (this.id != null ? this.id.hashCode() : 0);
+        return hash;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + "id=" + id + ", apiAddress=" + apiAddress + ", apiPort=" 
+ apiPort + ", socketAddress=" + socketAddress + ", socketPort=" + socketPort + 
']';
+    }
+
+}

Reply via email to