http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
new file mode 100644
index 0000000..3aa2931
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.ws.rs.core.Response.Status;
+
+/**
+ * Encapsulates an HTTP response. The toString method returns the
+ * specification-compliant response.
+ *
+ * @author unattributed
+ */
+public class HttpResponse {
+
+    private final Status status;
+    private final String entity;
+    private final Map<String, String> headers = new HashMap<>();
+
+    public HttpResponse(final Status status, final String entity) {
+        this.status = status;
+        this.entity = entity;
+        headers.put("content-length", 
String.valueOf(entity.getBytes().length));
+    }
+
+    public String getEntity() {
+        return entity;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    public Map<String, String> getHeaders() {
+        return Collections.unmodifiableMap(headers);
+    }
+
+    public void addHeader(final String key, final String value) {
+        if (key.contains(" ")) {
+            throw new IllegalArgumentException("Header key may not contain 
spaces.");
+        } else if ("content-length".equalsIgnoreCase(key)) {
+            throw new IllegalArgumentException("Content-Length header is set 
automatically based on length of content.");
+        }
+        headers.put(key, value);
+    }
+
+    public void addHeaders(final Map<String, String> headers) {
+        for (final Map.Entry<String, String> entry : headers.entrySet()) {
+            addHeader(entry.getKey(), entry.getValue());
+        }
+    }
+
+    @Override
+    public String toString() {
+
+        final StringBuilder strb = new StringBuilder();
+
+        // response line
+        strb.append("HTTP/1.1 ")
+                .append(status.getStatusCode())
+                .append(" ")
+                .append(status.getReasonPhrase())
+                .append("\n");
+
+        // headers
+        for (final Map.Entry<String, String> entry : headers.entrySet()) {
+            strb.append(entry.getKey()).append(": 
").append(entry.getValue()).append("\n");
+        }
+
+        strb.append("\n");
+
+        // body
+        strb.append(entity);
+
+        return strb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
new file mode 100644
index 0000000..28615d0
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+/**
+ * Wraps a HttpResponse with a time-delay. When the action is applied, the
+ * currently executing thread sleeps for the given delay before returning the
+ * response to the caller.
+ *
+ * This class is good for simulating network latency.
+ *
+ * @author unattributed
+ */
+public class HttpResponseAction {
+
+    private final HttpResponse response;
+
+    private final int waitTimeMs;
+
+    public HttpResponseAction(final HttpResponse response) {
+        this(response, 0);
+    }
+
+    public HttpResponseAction(final HttpResponse response, final int 
waitTimeMs) {
+        this.response = response;
+        this.waitTimeMs = waitTimeMs;
+    }
+
+    public HttpResponse apply() {
+        try {
+            Thread.sleep(waitTimeMs);
+        } catch (final InterruptedException ie) {
+            throw new RuntimeException(ie);
+        }
+
+        return response;
+    }
+
+    public HttpResponse getResponse() {
+        return response;
+    }
+
+    public int getWaitTimeMs() {
+        return waitTimeMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
new file mode 100644
index 0000000..f17a66c
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.nifi.cluster.manager.testutils.HttpRequest.HttpRequestBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple HTTP web server that allows clients to register canned-responses to
+ * respond to received requests.
+ *
+ * @author unattributed
+ */
+public class HttpServer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpServer.class);
+
+    private final ExecutorService executorService;
+    private final ServerSocket serverSocket;
+    private final Queue<HttpResponseAction> responseQueue = new 
ConcurrentLinkedQueue<>();
+    private final Map<String, String> checkedHeaders = new HashMap<>();
+    private final Map<String, List<String>> checkedParameters = new 
HashMap<>();
+    private final int port;
+
+    public HttpServer(int numThreads, int port) throws IOException {
+        this.port = port;
+        executorService = Executors.newFixedThreadPool(numThreads);
+        serverSocket = new ServerSocket(port);
+    }
+
+    public void start() {
+
+        new Thread() {
+            @Override
+            public void run() {
+                while (isRunning()) {
+                    try {
+                        final Socket conn = serverSocket.accept();
+                        executorService.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                handleRequest(conn);
+                                if (conn.isClosed() == false) {
+                                    try {
+                                        conn.close();
+                                    } catch (IOException ioe) {
+                                    }
+                                }
+                            }
+                        });
+                    } catch (final SocketException se) {
+                        /* ignored */
+                    } catch (final IOException ioe) {
+                        if (logger.isDebugEnabled()) {
+                            logger.warn("", ioe);
+                        }
+                    }
+                }
+            }
+        ;
+    }
+
+    .start();
+    }
+
+    public boolean isRunning() {
+        return executorService.isShutdown() == false;
+    }
+
+    public void stop() {
+        // shutdown server socket
+        try {
+            if (serverSocket.isClosed() == false) {
+                serverSocket.close();
+            }
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+
+        // shutdown executor service
+        try {
+            executorService.shutdown();
+            executorService.awaitTermination(3, TimeUnit.SECONDS);
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public int getPort() {
+        if (isRunning()) {
+            return serverSocket.getLocalPort();
+        } else {
+            return port;
+        }
+    }
+
+    public Queue<HttpResponseAction> addResponseAction(final 
HttpResponseAction response) {
+        responseQueue.add(response);
+        return responseQueue;
+    }
+
+    public void addCheckedHeaders(final Map<String, String> headers) {
+        checkedHeaders.putAll(headers);
+    }
+
+    public void addCheckedParameters(final Map<String, List<String>> 
parameters) {
+        checkedParameters.putAll(parameters);
+    }
+
+    private void handleRequest(final Socket conn) {
+        try {
+
+            final HttpRequest httpRequest = 
buildRequest(conn.getInputStream());
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("\n" + httpRequest);
+            }
+
+            // check headers
+            final Map<String, String> reqHeaders = httpRequest.getHeaders();
+            for (final Map.Entry<String, String> entry : 
checkedHeaders.entrySet()) {
+                if (reqHeaders.containsKey(entry.getKey())) {
+                    if 
(entry.getValue().equals(reqHeaders.get(entry.getKey()))) {
+                        logger.error("Incorrect HTTP request header value 
received for checked header: " + entry.getKey());
+                        conn.close();
+                        return;
+                    }
+                } else {
+                    logger.error("Missing checked header: " + entry.getKey());
+                    conn.close();
+                    return;
+                }
+            }
+
+            // check parameters
+            final Map<String, List<String>> reqParams = 
httpRequest.getParameters();
+            for (final Map.Entry<String, List<String>> entry : 
checkedParameters.entrySet()) {
+                if (reqParams.containsKey(entry.getKey())) {
+                    if (entry.getValue().equals(reqParams.get(entry.getKey())) 
== false) {
+                        logger.error("Incorrect HTTP request parameter values 
received for checked parameter: " + entry.getKey());
+                        conn.close();
+                        return;
+                    }
+                } else {
+                    logger.error("Missing checked parameter: " + 
entry.getKey());
+                    conn.close();
+                    return;
+                }
+            }
+
+            // apply the next response
+            final HttpResponseAction response = responseQueue.remove();
+            response.apply();
+
+            // send the response to client
+            final PrintWriter pw = new PrintWriter(conn.getOutputStream(), 
true);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("\n" + response.getResponse());
+            }
+
+            pw.print(response.getResponse());
+            pw.flush();
+
+        } catch (IOException ioe) { /* ignored */ }
+    }
+
+    private HttpRequest buildRequest(final InputStream requestIs) throws 
IOException {
+        return new HttpRequestReader().read(new InputStreamReader(requestIs));
+    }
+
+    // reads an HTTP request from the given reader
+    private class HttpRequestReader {
+
+        public HttpRequest read(final Reader reader) throws IOException {
+
+            HttpRequestBuilder builder = null;
+            String line = "";
+            boolean isRequestLine = true;
+            while ((line = readLine(reader)).isEmpty() == false) {
+                if (isRequestLine) {
+                    builder = HttpRequest.createFromRequestLine(line);
+                    isRequestLine = false;
+                } else {
+                    builder.addHeader(line);
+                }
+            }
+
+            if (builder != null) {
+                builder.addBody(reader);
+            }
+
+            return builder.build();
+        }
+
+        private String readLine(final Reader reader) throws IOException {
+
+            /* read character at time to prevent blocking */
+            final StringBuilder strb = new StringBuilder();
+            char c;
+            while ((c = (char) reader.read()) != '\n') {
+                if (c != '\r') {
+                    strb.append(c);
+                }
+            }
+            return strb.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
new file mode 100644
index 0000000..96943c2
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * @author unattributed
+ */
+public class ClusterManagerProtocolSenderImplTest {
+
+    private InetAddress address;
+
+    private int port;
+
+    private SocketProtocolListener listener;
+
+    private ClusterManagerProtocolSenderImpl sender;
+
+    private ProtocolHandler mockHandler;
+
+    @Before
+    public void setup() throws IOException {
+
+        address = InetAddress.getLocalHost();
+        ServerSocketConfiguration serverSocketConfiguration = new 
ServerSocketConfiguration();
+
+        mockHandler = mock(ProtocolHandler.class);
+
+        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
+        listener.addHandler(mockHandler);
+        listener.start();
+
+        port = listener.getPort();
+
+        SocketConfiguration socketConfiguration = new SocketConfiguration();
+        sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, 
protocolContext);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        if (listener.isRunning()) {
+            listener.stop();
+        }
+    }
+
+    @Test
+    public void testRequestFlow() throws Exception {
+
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
FlowResponseMessage());
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        FlowResponseMessage response = sender.requestFlow(request);
+        assertNotNull(response);
+    }
+
+    @Test
+    public void testRequestFlowWithBadResponseMessage() throws Exception {
+
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        try {
+            sender.requestFlow(request);
+            fail("failed to throw exception");
+        } catch (ProtocolException pe) {
+        }
+
+    }
+
+    @Test
+    public void testRequestFlowDelayedResponse() throws Exception {
+
+        final int time = 250;
+        sender.getSocketConfiguration().setSocketTimeout(time);
+
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<FlowResponseMessage>() {
+            @Override
+            public FlowResponseMessage answer(InvocationOnMock invocation) 
throws Throwable {
+                Thread.sleep(time * 3);
+                return new FlowResponseMessage();
+            }
+        });
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        try {
+            sender.requestFlow(request);
+            fail("failed to throw exception");
+        } catch (ProtocolException pe) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
new file mode 100644
index 0000000..4a69571
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+import org.mockito.stubbing.OngoingStubbing;
+
+public class ClusterServiceLocatorTest {
+
+    private ClusterServiceDiscovery mockServiceDiscovery;
+
+    private int fixedPort;
+
+    private DiscoverableService fixedService;
+
+    private ClusterServiceLocator serviceDiscoveryLocator;
+
+    private ClusterServiceLocator serviceDiscoveryFixedPortLocator;
+
+    private ClusterServiceLocator fixedServiceLocator;
+
+    @Before
+    public void setup() throws Exception {
+
+        fixedPort = 1;
+        mockServiceDiscovery = mock(ClusterServiceDiscovery.class);
+        fixedService = new DiscoverableServiceImpl("some-service", 
InetSocketAddress.createUnresolved("some-host", 20));
+
+        serviceDiscoveryLocator = new 
ClusterServiceLocator(mockServiceDiscovery);
+        serviceDiscoveryFixedPortLocator = new 
ClusterServiceLocator(mockServiceDiscovery, fixedPort);
+        fixedServiceLocator = new ClusterServiceLocator(fixedService);
+
+    }
+
+    @Test
+    public void getServiceWhenServiceDiscoveryNotStarted() {
+        assertNull(serviceDiscoveryLocator.getService());
+    }
+
+    @Test
+    public void getServiceWhenServiceDiscoveryFixedPortNotStarted() {
+        assertNull(serviceDiscoveryLocator.getService());
+    }
+
+    @Test
+    public void getServiceWhenFixedServiceNotStarted() {
+        assertEquals(fixedService, fixedServiceLocator.getService());
+    }
+
+    @Test
+    public void getServiceNotOnFirstAttempt() {
+
+        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
+        config.setNumAttempts(2);
+        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+        config.setTimeBetweenAttempts(1);
+
+        serviceDiscoveryLocator.setAttemptsConfig(config);
+
+        OngoingStubbing<DiscoverableService> stubbing = null;
+        for (int i = 0; i < config.getNumAttempts() - 1; i++) {
+            if (stubbing == null) {
+                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
+            } else {
+                stubbing.thenReturn(null);
+            }
+        }
+        stubbing.thenReturn(fixedService);
+
+        assertEquals(fixedService, serviceDiscoveryLocator.getService());
+
+    }
+
+    @Test
+    public void getServiceNotOnFirstAttemptWithFixedPort() {
+
+        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
+        config.setNumAttempts(2);
+        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+        config.setTimeBetweenAttempts(1);
+
+        serviceDiscoveryFixedPortLocator.setAttemptsConfig(config);
+
+        OngoingStubbing<DiscoverableService> stubbing = null;
+        for (int i = 0; i < config.getNumAttempts() - 1; i++) {
+            if (stubbing == null) {
+                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
+            } else {
+                stubbing.thenReturn(null);
+            }
+        }
+        stubbing.thenReturn(fixedService);
+
+        InetSocketAddress resultAddress = 
InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(),
 fixedPort);
+        DiscoverableService resultService = new 
DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress);
+        assertEquals(resultService, 
serviceDiscoveryFixedPortLocator.getService());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
new file mode 100644
index 0000000..4d85d1a
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.net.InetSocketAddress;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class ClusterServicesBroadcasterTest {
+
+    private ClusterServicesBroadcaster broadcaster;
+
+    private MulticastProtocolListener listener;
+
+    private DummyProtocolHandler handler;
+
+    private InetSocketAddress multicastAddress;
+
+    private DiscoverableService broadcastedService;
+
+    private ProtocolContext protocolContext;
+
+    private MulticastConfiguration configuration;
+
+    @Before
+    public void setup() throws Exception {
+
+        broadcastedService = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", 11111));
+
+        multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
+
+        configuration = new MulticastConfiguration();
+
+        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        broadcaster = new ClusterServicesBroadcaster(multicastAddress, 
configuration, protocolContext, "500 ms");
+        broadcaster.addService(broadcastedService);
+
+        handler = new DummyProtocolHandler();
+        listener = new MulticastProtocolListener(5, multicastAddress, 
configuration, protocolContext);
+        listener.addHandler(handler);
+    }
+
+    @After
+    public void teardown() {
+
+        if (broadcaster.isRunning()) {
+            broadcaster.stop();
+        }
+
+        try {
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace(System.out);
+        }
+
+    }
+
+    @Test
+    @Ignore
+    public void testBroadcastReceived() throws Exception {
+
+        broadcaster.start();
+        listener.start();
+
+        Thread.sleep(1000);
+
+        listener.stop();
+
+        assertNotNull(handler.getProtocolMessage());
+        assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, 
handler.getProtocolMessage().getType());
+        final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) 
handler.getProtocolMessage();
+        assertEquals(broadcastedService.getServiceName(), 
msg.getServiceName());
+        assertEquals(broadcastedService.getServiceAddress().getHostName(), 
msg.getAddress());
+        assertEquals(broadcastedService.getServiceAddress().getPort(), 
msg.getPort());
+    }
+
+    private class DummyProtocolHandler implements ProtocolHandler {
+
+        private ProtocolMessage protocolMessage;
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            this.protocolMessage = msg;
+            return null;
+        }
+
+        public ProtocolMessage getProtocolMessage() {
+            return protocolMessage;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
new file mode 100644
index 0000000..acd21e8
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastUtils;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class MulticastProtocolListenerTest {
+
+    private MulticastProtocolListener listener;
+
+    private MulticastSocket socket;
+
+    private InetSocketAddress address;
+
+    private MulticastConfiguration configuration;
+
+    private ProtocolContext protocolContext;
+
+    @Before
+    public void setup() throws Exception {
+
+        address = new InetSocketAddress("226.1.1.1", 60000);
+        configuration = new MulticastConfiguration();
+
+        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        listener = new MulticastProtocolListener(5, address, configuration, 
protocolContext);
+        listener.start();
+
+        socket = MulticastUtils.createMulticastSocket(address.getPort(), 
configuration);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        try {
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        } finally {
+            MulticastUtils.closeQuietly(socket);
+        }
+    }
+
+    @Test
+    public void testBadRequest() throws Exception {
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
+        listener.addHandler(handler);
+        DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address);
+        socket.send(packet);
+        Thread.sleep(250);
+        assertEquals(0, handler.getMessages().size());
+    }
+
+    @Test
+    @Ignore
+    public void testRequest() throws Exception {
+
+        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
+        listener.addHandler(handler);
+
+        ProtocolMessage msg = new PingMessage();
+        MulticastProtocolMessage multicastMsg = new 
MulticastProtocolMessage("some-id", msg);
+
+        // marshal message to output stream
+        ProtocolMessageMarshaller marshaller = 
protocolContext.createMarshaller();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        marshaller.marshal(multicastMsg, baos);
+        byte[] requestPacketBytes = baos.toByteArray();
+        DatagramPacket packet = new DatagramPacket(requestPacketBytes, 
requestPacketBytes.length, address);
+        socket.send(packet);
+
+        Thread.sleep(250);
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+
+    }
+
+    private class ReflexiveProtocolHandler implements ProtocolHandler {
+
+        private List<ProtocolMessage> messages = new ArrayList<>();
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            messages.add(msg);
+            return msg;
+        }
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        public List<ProtocolMessage> getMessages() {
+            return messages;
+        }
+
+    }
+
+    private class DelayedProtocolHandler implements ProtocolHandler {
+
+        private int delay = 0;
+
+        private List<ProtocolMessage> messages = new ArrayList<>();
+
+        public DelayedProtocolHandler(int delay) {
+            this.delay = delay;
+        }
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            try {
+                messages.add(msg);
+                Thread.sleep(delay);
+                return null;
+            } catch (final InterruptedException ie) {
+                throw new ProtocolException(ie);
+            }
+
+        }
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        public List<ProtocolMessage> getMessages() {
+            return messages;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
new file mode 100644
index 0000000..7c62d2f
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * @author unattributed
+ */
+public class NodeProtocolSenderImplTest {
+
+    private SocketProtocolListener listener;
+
+    private NodeProtocolSenderImpl sender;
+
+    private DiscoverableService service;
+
+    private ServerSocketConfiguration serverSocketConfiguration;
+
+    private ClusterServiceLocator mockServiceLocator;
+
+    private ProtocolHandler mockHandler;
+
+    private NodeIdentifier nodeIdentifier;
+
+    @Before
+    public void setup() throws IOException {
+
+        serverSocketConfiguration = new ServerSocketConfiguration();
+
+        mockServiceLocator = mock(ClusterServiceLocator.class);
+        mockHandler = mock(ProtocolHandler.class);
+
+        nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, 
"localhost", 5678);
+
+        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
+        listener.setShutdownListenerSeconds(3);
+        listener.addHandler(mockHandler);
+        listener.start();
+
+        service = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", listener.getPort()));
+
+        SocketConfiguration socketConfiguration = new SocketConfiguration();
+        socketConfiguration.setReuseAddress(true);
+        sender = new NodeProtocolSenderImpl(mockServiceLocator, 
socketConfiguration, protocolContext);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        if (listener.isRunning()) {
+            listener.stop();
+        }
+    }
+
+    @Test
+    public void testConnect() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        ConnectionResponseMessage mockMessage = new 
ConnectionResponseMessage();
+        mockMessage.setConnectionResponse(new 
ConnectionResponse(nodeIdentifier, new 
StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, 
null, null, UUID.randomUUID().toString()));
+        
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
+
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+        ConnectionResponseMessage response = sender.requestConnection(request);
+        assertNotNull(response);
+    }
+
+    @Test(expected = UnknownServiceAddressException.class)
+    public void testConnectNoClusterManagerAddress() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(null);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
ConnectionResponseMessage());
+
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+    }
+
+    @Test(expected = ProtocolException.class)
+    public void testConnectBadResponse() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
+
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+
+    }
+
+    @Test(expected = ProtocolException.class)
+    public void testConnectDelayedResponse() throws Exception {
+
+        final int time = 250;
+        sender.getSocketConfiguration().setSocketTimeout(time);
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<ConnectionResponseMessage>() {
+            @Override
+            public ConnectionResponseMessage answer(InvocationOnMock 
invocation) throws Throwable {
+                Thread.sleep(time * 3);
+                return new ConnectionResponseMessage();
+            }
+        });
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+
+    }
+
+    @Test
+    public void testHeartbeat() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
+
+        HeartbeatMessage msg = new HeartbeatMessage();
+        HeartbeatPayload hbPayload = new HeartbeatPayload();
+        Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, 
"localhost", 4), false, false, hbPayload.marshal());
+        msg.setHeartbeat(hb);
+        sender.heartbeat(msg);
+    }
+
+    @Test
+    public void testNotifyControllerStartupFailure() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
+
+        ControllerStartupFailureMessage msg = new 
ControllerStartupFailureMessage();
+        msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, 
"some-addr", 1));
+        msg.setExceptionMessage("some exception");
+        sender.notifyControllerStartupFailure(msg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
new file mode 100644
index 0000000..92a7d2a
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler;
+import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class SocketProtocolListenerTest {
+
+    private SocketProtocolListener listener;
+
+    private Socket socket;
+
+    private ProtocolMessageMarshaller<ProtocolMessage> marshaller;
+
+    private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;
+
+    @Before
+    public void setup() throws Exception {
+
+        final ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        marshaller = protocolContext.createMarshaller();
+        unmarshaller = protocolContext.createUnmarshaller();
+
+        ServerSocketConfiguration configuration = new 
ServerSocketConfiguration();
+        configuration.setSocketTimeout(1000);
+
+        listener = new SocketProtocolListener(5, 0, configuration, 
protocolContext);
+        listener.start();
+
+        int port = listener.getPort();
+
+        SocketConfiguration config = new SocketConfiguration();
+        config.setReuseAddress(true);
+        config.setSocketTimeout(1000);
+        socket = SocketUtils.createSocket(new InetSocketAddress("localhost", 
port), config);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        try {
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    @Test
+    public void testBadRequest() throws Exception {
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
+        listener.addHandler(handler);
+        socket.getOutputStream().write(5);
+        Thread.sleep(250);
+        assertEquals(0, handler.getMessages().size());
+    }
+
+    @Test
+    public void testRequest() throws Exception {
+        ProtocolMessage msg = new PingMessage();
+
+        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
+        listener.addHandler(handler);
+
+        // marshal message to output stream
+        marshaller.marshal(msg, socket.getOutputStream());
+
+        // unmarshall response and return
+        ProtocolMessage response = 
unmarshaller.unmarshal(socket.getInputStream());
+        assertEquals(msg.getType(), response.getType());
+
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+    }
+
+    @Test
+    public void testDelayedRequest() throws Exception {
+        ProtocolMessage msg = new PingMessage();
+
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(2000);
+        listener.addHandler(handler);
+
+        // marshal message to output stream
+        marshaller.marshal(msg, socket.getOutputStream());
+
+        try {
+            socket.getInputStream().read();
+            fail("Socket timeout not received.");
+        } catch (SocketTimeoutException ste) {
+        }
+
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
new file mode 100644
index 0000000..2f16777
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.testutils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+
+/**
+ * @author unattributed
+ */
+public class DelayedProtocolHandler implements ProtocolHandler {
+
+    private int delay = 0;
+    private List<ProtocolMessage> messages = new ArrayList<>();
+
+    public DelayedProtocolHandler(int delay) {
+        this.delay = delay;
+    }
+
+    @Override
+    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+        try {
+            messages.add(msg);
+            Thread.sleep(delay);
+            return null;
+        } catch (final InterruptedException ie) {
+            throw new ProtocolException(ie);
+        }
+
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return true;
+    }
+
+    public List<ProtocolMessage> getMessages() {
+        return messages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
new file mode 100644
index 0000000..e80f52c
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.testutils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+
+/**
+ * @author unattributed
+ */
+public class ReflexiveProtocolHandler implements ProtocolHandler {
+
+    private List<ProtocolMessage> messages = new ArrayList<>();
+
+    @Override
+    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+        messages.add(msg);
+        return msg;
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return true;
+    }
+
+    public List<ProtocolMessage> getMessages() {
+        return messages;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..92eb78c
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration scan="true" scanPeriod="30 seconds">
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
+        </encoder>
+    </appender>
+    
+    <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
+    <logger name="org.apache.nifi" level="INFO"/>
+    
+    <!-- Logger for managing logging statements for nifi clusters. -->
+    <logger name="org.apache.nifi.cluster" level="INFO"/>
+
+    <!-- 
+        Logger for logging HTTP requests received by the web server.  Setting
+        log level to 'debug' activates HTTP request logging.
+    -->
+    <logger name="org.apache.nifi.server.JettyServer" level="INFO"/>
+
+    <!-- Logger for managing logging statements for jetty -->
+    <logger name="org.mortbay" level="INFO"/>
+
+    <!-- Suppress non-error messages due to excessive logging by class -->
+    <logger name="com.sun.jersey.spi.container.servlet.WebComponent" 
level="ERROR"/>
+
+    <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
+
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+    </root>
+    
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
new file mode 100755
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
 
b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
new file mode 100755
index 0000000..e8e4c2b
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
@@ -0,0 +1,12 @@
+
+bad data should be skipped
+
+# this is a comment
+  2.2.2.2  # this is another comment ####
+3.3.3.3/8
+
+4.4.4.4/24
+
+5.5.5.255/31
+
+more bad data
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/.gitignore
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/.gitignore 
b/nar-bundles/framework-bundle/framework/core-api/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/pom.xml 
b/nar-bundles/framework-bundle/framework/core-api/pom.xml
new file mode 100644
index 0000000..b163cd0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core-api/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-framework-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>core-api</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>NiFi Core API</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-nar</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>remote-communications-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-runtime</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>client-dto</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.quartz-scheduler</groupId>
+            <artifactId>quartz</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
new file mode 100644
index 0000000..0092f7a
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+public class AdaptedNodeInformation {
+
+    private String hostname;
+    private Integer siteToSitePort;
+    private int apiPort;
+    private boolean isSiteToSiteSecure;
+    private int totalFlowFiles;
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public void setHostname(String hostname) {
+        this.hostname = hostname;
+    }
+
+    public Integer getSiteToSitePort() {
+        return siteToSitePort;
+    }
+
+    public void setSiteToSitePort(Integer siteToSitePort) {
+        this.siteToSitePort = siteToSitePort;
+    }
+
+    public int getApiPort() {
+        return apiPort;
+    }
+
+    public void setApiPort(int apiPort) {
+        this.apiPort = apiPort;
+    }
+
+    public boolean isSiteToSiteSecure() {
+        return isSiteToSiteSecure;
+    }
+
+    public void setSiteToSiteSecure(boolean isSiteToSiteSecure) {
+        this.isSiteToSiteSecure = isSiteToSiteSecure;
+    }
+
+    public int getTotalFlowFiles() {
+        return totalFlowFiles;
+    }
+
+    public void setTotalFlowFiles(int totalFlowFiles) {
+        this.totalFlowFiles = totalFlowFiles;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
new file mode 100644
index 0000000..5751c32
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+@XmlRootElement
+public class ClusterNodeInformation {
+
+    private Collection<NodeInformation> nodeInfo;
+
+    private static final JAXBContext JAXB_CONTEXT;
+
+    static {
+        try {
+            JAXB_CONTEXT = 
JAXBContext.newInstance(ClusterNodeInformation.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.", e);
+        }
+    }
+
+    public ClusterNodeInformation() {
+        this.nodeInfo = null;
+    }
+
+    public void setNodeInformation(final Collection<NodeInformation> nodeInfo) 
{
+        this.nodeInfo = nodeInfo;
+    }
+
+    @XmlJavaTypeAdapter(NodeInformationAdapter.class)
+    public Collection<NodeInformation> getNodeInformation() {
+        return nodeInfo;
+    }
+
+    public void marshal(final OutputStream os) throws JAXBException {
+        final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+        marshaller.marshal(this, os);
+    }
+
+    public static ClusterNodeInformation unmarshal(final InputStream is) 
throws JAXBException {
+        final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+        return (ClusterNodeInformation) unmarshaller.unmarshal(is);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
new file mode 100644
index 0000000..987ff65
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+public interface NodeInformant {
+
+    ClusterNodeInformation getNodeInformation();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
new file mode 100644
index 0000000..848eb7e
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+public class NodeInformation {
+
+    private final String hostname;
+    private final Integer siteToSitePort;
+    private final int apiPort;
+    private final boolean isSiteToSiteSecure;
+    private final int totalFlowFiles;
+
+    public NodeInformation(final String hostname, final Integer 
siteToSitePort, final int apiPort,
+            final boolean isSiteToSiteSecure, final int totalFlowFiles) {
+        this.hostname = hostname;
+        this.siteToSitePort = siteToSitePort;
+        this.apiPort = apiPort;
+        this.isSiteToSiteSecure = isSiteToSiteSecure;
+        this.totalFlowFiles = totalFlowFiles;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getAPIPort() {
+        return apiPort;
+    }
+
+    public Integer getSiteToSitePort() {
+        return siteToSitePort;
+    }
+
+    public boolean isSiteToSiteSecure() {
+        return isSiteToSiteSecure;
+    }
+
+    public int getTotalFlowFiles() {
+        return totalFlowFiles;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof NodeInformation)) {
+            return false;
+        }
+
+        final NodeInformation other = (NodeInformation) obj;
+        if (!hostname.equals(other.hostname)) {
+            return false;
+        }
+        if (siteToSitePort == null && other.siteToSitePort != null) {
+            return false;
+        }
+        if (siteToSitePort != null && other.siteToSitePort == null) {
+            return false;
+        } else if (siteToSitePort != null && siteToSitePort.intValue() != 
other.siteToSitePort.intValue()) {
+            return false;
+        }
+        if (apiPort != other.apiPort) {
+            return false;
+        }
+        if (isSiteToSiteSecure != other.isSiteToSiteSecure) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : 
siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0);
+    }
+
+    @Override
+    public String toString() {
+        return "Node[" + hostname + ":" + apiPort + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
new file mode 100644
index 0000000..630631f
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+
+public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, 
NodeInformation> {
+
+    @Override
+    public NodeInformation unmarshal(final AdaptedNodeInformation adapted) 
throws Exception {
+        return new NodeInformation(adapted.getHostname(), 
adapted.getSiteToSitePort(), adapted.getApiPort(), 
adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles());
+    }
+
+    @Override
+    public AdaptedNodeInformation marshal(final NodeInformation 
nodeInformation) throws Exception {
+        final AdaptedNodeInformation adapted = new AdaptedNodeInformation();
+        adapted.setHostname(nodeInformation.getHostname());
+        adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort());
+        adapted.setApiPort(nodeInformation.getAPIPort());
+        adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure());
+        adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles());
+        return adapted;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
new file mode 100644
index 0000000..57c1c30
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+public interface DataFlow {
+
+    /**
+     * @return the raw byte array of the flow
+     */
+    public byte[] getFlow();
+
+    /**
+     * @return the raw byte array of the templates
+     */
+    public byte[] getTemplates();
+
+    /**
+     * @return the raw byte array of the snippets
+     */
+    public byte[] getSnippets();
+
+    /**
+     * @return true if processors should be automatically started at 
application
+     * startup; false otherwise
+     */
+    public boolean isAutoStartProcessors();
+}

Reply via email to