http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
new file mode 100644
index 0000000..35380dd
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Encapsulates an HTTP request. The toString method returns the
+ * specification-compliant request.
+ *
+ * @author unattributed
+ */
+public class HttpRequest {
+
+    private String method;
+    private String uri;
+    private String rawUri;
+    private String version;
+    private String body;
+    private String rawRequest;
+    private Map<String, String> headers = new HashMap<>();
+    private Map<String, List<String>> parameters = new HashMap<>();
+
+    public static HttpRequestBuilder createFromRequestLine(final String 
requestLine) {
+        return new HttpRequestBuilder(requestLine);
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public Map<String, String> getHeaders() {
+        return Collections.unmodifiableMap(headers);
+    }
+
+    public String getHeaderValue(final String header) {
+        for (final Map.Entry<String, String> entry : getHeaders().entrySet()) {
+            if (entry.getKey().equalsIgnoreCase(header)) {
+                return entry.getValue();
+            }
+        }
+        return null;
+    }
+
+    public String getMethod() {
+        return method;
+    }
+
+    public Map<String, List<String>> getParameters() {
+        final Map<String, List<String>> result = new HashMap<>();
+        for (final Map.Entry<String, List<String>> entry : 
parameters.entrySet()) {
+            result.put(entry.getKey(), 
Collections.unmodifiableList(entry.getValue()));
+        }
+        return Collections.unmodifiableMap(result);
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public String getRawUri() {
+        return rawUri;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    @Override
+    public String toString() {
+        return rawRequest;
+    }
+
+    /**
+     * A builder for constructing basic HTTP requests. It handles only enough 
of
+     * the HTTP specification to support basic unit testing, and it should not
+     * be used otherwise.
+     */
+    public static class HttpRequestBuilder {
+
+        private String method;
+        private String uri;
+        private String rawUri;
+        private String version;
+        private Map<String, String> headers = new HashMap<>();
+        private Map<String, List<String>> parameters = new HashMap<>();
+        private int contentLength = 0;
+        private String contentType;
+        private String body = "";
+        private StringBuilder rawRequest = new StringBuilder();
+
+        private HttpRequestBuilder(final String requestLine) {
+
+            final String[] tokens = requestLine.split(" ");
+            if (tokens.length != 3) {
+                throw new IllegalArgumentException("Invalid HTTP Request Line: 
" + requestLine);
+            }
+
+            method = tokens[0];
+            if (HttpMethod.GET.equalsIgnoreCase(method) || 
HttpMethod.HEAD.equalsIgnoreCase(method) || 
HttpMethod.DELETE.equalsIgnoreCase(method) || 
HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+                final int queryIndex = tokens[1].indexOf("?");
+                if (queryIndex > -1) {
+                    uri = tokens[1].substring(0, queryIndex);
+                    addParameters(tokens[1].substring(queryIndex + 1));
+                } else {
+                    uri = tokens[1];
+                }
+            }
+            rawUri = tokens[1];
+            version = tokens[2];
+            rawRequest.append(requestLine).append("\n");
+        }
+
+        private void addHeader(final String key, final String value) {
+            if (key.contains(" ")) {
+                throw new IllegalArgumentException("Header key may not contain 
spaces.");
+            } else if ("content-length".equalsIgnoreCase(key)) {
+                contentLength = (StringUtils.isBlank(value.trim())) ? 0 : 
Integer.parseInt(value.trim());
+            } else if ("content-type".equalsIgnoreCase(key)) {
+                contentType = value.trim();
+            }
+            headers.put(key, value);
+        }
+
+        public void addHeader(final String header) {
+            final int firstColonIndex = header.indexOf(":");
+            if (firstColonIndex < 0) {
+                throw new IllegalArgumentException("Invalid HTTP Header line: 
" + header);
+            }
+            addHeader(header.substring(0, firstColonIndex), 
header.substring(firstColonIndex + 1));
+            rawRequest.append(header).append("\n");
+        }
+
+        // final because constructor calls it
+        public final void addParameters(final String queryString) {
+
+            if (StringUtils.isBlank(queryString)) {
+                return;
+            }
+
+            final String normQueryString;
+            if (queryString.startsWith("?")) {
+                normQueryString = queryString.substring(1);
+            } else {
+                normQueryString = queryString;
+            }
+            final String[] keyValuePairs = normQueryString.split("&");
+            for (final String keyValuePair : keyValuePairs) {
+                final String[] keyValueTokens = keyValuePair.split("=");
+                try {
+                    addParameter(
+                            URLDecoder.decode(keyValueTokens[0], "utf-8"),
+                            URLDecoder.decode(keyValueTokens[1], "utf-8")
+                    );
+                } catch (UnsupportedEncodingException use) {
+                    throw new RuntimeException(use);
+                }
+            }
+        }
+
+        public void addParameter(final String key, final String value) {
+
+            if (key.contains(" ")) {
+                throw new IllegalArgumentException("Parameter key may not 
contain spaces: " + key);
+            }
+
+            final List<String> values;
+            if (parameters.containsKey(key)) {
+                values = parameters.get(key);
+            } else {
+                values = new ArrayList<>();
+                parameters.put(key, values);
+            }
+            values.add(value);
+        }
+
+        public void addBody(final Reader reader) throws IOException {
+
+            if (contentLength <= 0) {
+                return;
+            }
+
+            final char[] buf = new char[contentLength];
+            int offset = 0;
+            int numRead = 0;
+            while (offset < buf.length && (numRead = reader.read(buf, offset, 
buf.length - offset)) >= 0) {
+                offset += numRead;
+            }
+            body = new String(buf);
+            rawRequest.append("\n");
+            rawRequest.append(body);
+        }
+
+        public HttpRequest build() throws UnsupportedEncodingException {
+
+            if (HttpMethod.GET.equalsIgnoreCase(method) == false && 
HttpMethod.HEAD.equalsIgnoreCase(method) == false && 
contentType.equalsIgnoreCase(MediaType.APPLICATION_FORM_URLENCODED)) {
+                addParameters(body);
+            }
+
+            final HttpRequest request = new HttpRequest();
+            request.method = this.method;
+            request.uri = this.uri;
+            request.rawUri = this.rawUri;
+            request.version = this.version;
+            request.headers.putAll(this.headers);
+            request.parameters.putAll(this.parameters);
+            request.body = this.body;
+            request.rawRequest = this.rawRequest.toString();
+
+            return request;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
new file mode 100644
index 0000000..3aa2931
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.ws.rs.core.Response.Status;
+
+/**
+ * Encapsulates an HTTP response. The toString method returns the
+ * specification-compliant response.
+ *
+ * @author unattributed
+ */
+public class HttpResponse {
+
+    private final Status status;
+    private final String entity;
+    private final Map<String, String> headers = new HashMap<>();
+
+    public HttpResponse(final Status status, final String entity) {
+        this.status = status;
+        this.entity = entity;
+        headers.put("content-length", 
String.valueOf(entity.getBytes().length));
+    }
+
+    public String getEntity() {
+        return entity;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    public Map<String, String> getHeaders() {
+        return Collections.unmodifiableMap(headers);
+    }
+
+    public void addHeader(final String key, final String value) {
+        if (key.contains(" ")) {
+            throw new IllegalArgumentException("Header key may not contain 
spaces.");
+        } else if ("content-length".equalsIgnoreCase(key)) {
+            throw new IllegalArgumentException("Content-Length header is set 
automatically based on length of content.");
+        }
+        headers.put(key, value);
+    }
+
+    public void addHeaders(final Map<String, String> headers) {
+        for (final Map.Entry<String, String> entry : headers.entrySet()) {
+            addHeader(entry.getKey(), entry.getValue());
+        }
+    }
+
+    @Override
+    public String toString() {
+
+        final StringBuilder strb = new StringBuilder();
+
+        // response line
+        strb.append("HTTP/1.1 ")
+                .append(status.getStatusCode())
+                .append(" ")
+                .append(status.getReasonPhrase())
+                .append("\n");
+
+        // headers
+        for (final Map.Entry<String, String> entry : headers.entrySet()) {
+            strb.append(entry.getKey()).append(": 
").append(entry.getValue()).append("\n");
+        }
+
+        strb.append("\n");
+
+        // body
+        strb.append(entity);
+
+        return strb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
new file mode 100644
index 0000000..28615d0
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+/**
+ * Wraps a HttpResponse with a time-delay. When the action is applied, the
+ * currently executing thread sleeps for the given delay before returning the
+ * response to the caller.
+ *
+ * This class is good for simulating network latency.
+ *
+ * @author unattributed
+ */
+public class HttpResponseAction {
+
+    private final HttpResponse response;
+
+    private final int waitTimeMs;
+
+    public HttpResponseAction(final HttpResponse response) {
+        this(response, 0);
+    }
+
+    public HttpResponseAction(final HttpResponse response, final int 
waitTimeMs) {
+        this.response = response;
+        this.waitTimeMs = waitTimeMs;
+    }
+
+    public HttpResponse apply() {
+        try {
+            Thread.sleep(waitTimeMs);
+        } catch (final InterruptedException ie) {
+            throw new RuntimeException(ie);
+        }
+
+        return response;
+    }
+
+    public HttpResponse getResponse() {
+        return response;
+    }
+
+    public int getWaitTimeMs() {
+        return waitTimeMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
new file mode 100644
index 0000000..f17a66c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.testutils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.nifi.cluster.manager.testutils.HttpRequest.HttpRequestBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple HTTP web server that allows clients to register canned-responses to
+ * respond to received requests.
+ *
+ * @author unattributed
+ */
+public class HttpServer {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpServer.class);
+
+    private final ExecutorService executorService;
+    private final ServerSocket serverSocket;
+    private final Queue<HttpResponseAction> responseQueue = new 
ConcurrentLinkedQueue<>();
+    private final Map<String, String> checkedHeaders = new HashMap<>();
+    private final Map<String, List<String>> checkedParameters = new 
HashMap<>();
+    private final int port;
+
+    public HttpServer(int numThreads, int port) throws IOException {
+        this.port = port;
+        executorService = Executors.newFixedThreadPool(numThreads);
+        serverSocket = new ServerSocket(port);
+    }
+
+    public void start() {
+
+        new Thread() {
+            @Override
+            public void run() {
+                while (isRunning()) {
+                    try {
+                        final Socket conn = serverSocket.accept();
+                        executorService.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                handleRequest(conn);
+                                if (conn.isClosed() == false) {
+                                    try {
+                                        conn.close();
+                                    } catch (IOException ioe) {
+                                    }
+                                }
+                            }
+                        });
+                    } catch (final SocketException se) {
+                        /* ignored */
+                    } catch (final IOException ioe) {
+                        if (logger.isDebugEnabled()) {
+                            logger.warn("", ioe);
+                        }
+                    }
+                }
+            }
+        ;
+    }
+
+    .start();
+    }
+
+    public boolean isRunning() {
+        return executorService.isShutdown() == false;
+    }
+
+    public void stop() {
+        // shutdown server socket
+        try {
+            if (serverSocket.isClosed() == false) {
+                serverSocket.close();
+            }
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+
+        // shutdown executor service
+        try {
+            executorService.shutdown();
+            executorService.awaitTermination(3, TimeUnit.SECONDS);
+        } catch (final Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public int getPort() {
+        if (isRunning()) {
+            return serverSocket.getLocalPort();
+        } else {
+            return port;
+        }
+    }
+
+    public Queue<HttpResponseAction> addResponseAction(final 
HttpResponseAction response) {
+        responseQueue.add(response);
+        return responseQueue;
+    }
+
+    public void addCheckedHeaders(final Map<String, String> headers) {
+        checkedHeaders.putAll(headers);
+    }
+
+    public void addCheckedParameters(final Map<String, List<String>> 
parameters) {
+        checkedParameters.putAll(parameters);
+    }
+
+    private void handleRequest(final Socket conn) {
+        try {
+
+            final HttpRequest httpRequest = 
buildRequest(conn.getInputStream());
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("\n" + httpRequest);
+            }
+
+            // check headers
+            final Map<String, String> reqHeaders = httpRequest.getHeaders();
+            for (final Map.Entry<String, String> entry : 
checkedHeaders.entrySet()) {
+                if (reqHeaders.containsKey(entry.getKey())) {
+                    if 
(entry.getValue().equals(reqHeaders.get(entry.getKey()))) {
+                        logger.error("Incorrect HTTP request header value 
received for checked header: " + entry.getKey());
+                        conn.close();
+                        return;
+                    }
+                } else {
+                    logger.error("Missing checked header: " + entry.getKey());
+                    conn.close();
+                    return;
+                }
+            }
+
+            // check parameters
+            final Map<String, List<String>> reqParams = 
httpRequest.getParameters();
+            for (final Map.Entry<String, List<String>> entry : 
checkedParameters.entrySet()) {
+                if (reqParams.containsKey(entry.getKey())) {
+                    if (entry.getValue().equals(reqParams.get(entry.getKey())) 
== false) {
+                        logger.error("Incorrect HTTP request parameter values 
received for checked parameter: " + entry.getKey());
+                        conn.close();
+                        return;
+                    }
+                } else {
+                    logger.error("Missing checked parameter: " + 
entry.getKey());
+                    conn.close();
+                    return;
+                }
+            }
+
+            // apply the next response
+            final HttpResponseAction response = responseQueue.remove();
+            response.apply();
+
+            // send the response to client
+            final PrintWriter pw = new PrintWriter(conn.getOutputStream(), 
true);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("\n" + response.getResponse());
+            }
+
+            pw.print(response.getResponse());
+            pw.flush();
+
+        } catch (IOException ioe) { /* ignored */ }
+    }
+
+    private HttpRequest buildRequest(final InputStream requestIs) throws 
IOException {
+        return new HttpRequestReader().read(new InputStreamReader(requestIs));
+    }
+
+    // reads an HTTP request from the given reader
+    private class HttpRequestReader {
+
+        public HttpRequest read(final Reader reader) throws IOException {
+
+            HttpRequestBuilder builder = null;
+            String line = "";
+            boolean isRequestLine = true;
+            while ((line = readLine(reader)).isEmpty() == false) {
+                if (isRequestLine) {
+                    builder = HttpRequest.createFromRequestLine(line);
+                    isRequestLine = false;
+                } else {
+                    builder.addHeader(line);
+                }
+            }
+
+            if (builder != null) {
+                builder.addBody(reader);
+            }
+
+            return builder.build();
+        }
+
+        private String readLine(final Reader reader) throws IOException {
+
+            /* read character at time to prevent blocking */
+            final StringBuilder strb = new StringBuilder();
+            char c;
+            while ((c = (char) reader.read()) != '\n') {
+                if (c != '\r') {
+                    strb.append(c);
+                }
+            }
+            return strb.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
new file mode 100644
index 0000000..96943c2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * @author unattributed
+ */
+public class ClusterManagerProtocolSenderImplTest {
+
+    private InetAddress address;
+
+    private int port;
+
+    private SocketProtocolListener listener;
+
+    private ClusterManagerProtocolSenderImpl sender;
+
+    private ProtocolHandler mockHandler;
+
+    @Before
+    public void setup() throws IOException {
+
+        address = InetAddress.getLocalHost();
+        ServerSocketConfiguration serverSocketConfiguration = new 
ServerSocketConfiguration();
+
+        mockHandler = mock(ProtocolHandler.class);
+
+        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
+        listener.addHandler(mockHandler);
+        listener.start();
+
+        port = listener.getPort();
+
+        SocketConfiguration socketConfiguration = new SocketConfiguration();
+        sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, 
protocolContext);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        if (listener.isRunning()) {
+            listener.stop();
+        }
+    }
+
+    @Test
+    public void testRequestFlow() throws Exception {
+
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
FlowResponseMessage());
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        FlowResponseMessage response = sender.requestFlow(request);
+        assertNotNull(response);
+    }
+
+    @Test
+    public void testRequestFlowWithBadResponseMessage() throws Exception {
+
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        try {
+            sender.requestFlow(request);
+            fail("failed to throw exception");
+        } catch (ProtocolException pe) {
+        }
+
+    }
+
+    @Test
+    public void testRequestFlowDelayedResponse() throws Exception {
+
+        final int time = 250;
+        sender.getSocketConfiguration().setSocketTimeout(time);
+
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<FlowResponseMessage>() {
+            @Override
+            public FlowResponseMessage answer(InvocationOnMock invocation) 
throws Throwable {
+                Thread.sleep(time * 3);
+                return new FlowResponseMessage();
+            }
+        });
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        try {
+            sender.requestFlow(request);
+            fail("failed to throw exception");
+        } catch (ProtocolException pe) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
new file mode 100644
index 0000000..4a69571
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+import org.mockito.stubbing.OngoingStubbing;
+
+public class ClusterServiceLocatorTest {
+
+    private ClusterServiceDiscovery mockServiceDiscovery;
+
+    private int fixedPort;
+
+    private DiscoverableService fixedService;
+
+    private ClusterServiceLocator serviceDiscoveryLocator;
+
+    private ClusterServiceLocator serviceDiscoveryFixedPortLocator;
+
+    private ClusterServiceLocator fixedServiceLocator;
+
+    @Before
+    public void setup() throws Exception {
+
+        fixedPort = 1;
+        mockServiceDiscovery = mock(ClusterServiceDiscovery.class);
+        fixedService = new DiscoverableServiceImpl("some-service", 
InetSocketAddress.createUnresolved("some-host", 20));
+
+        serviceDiscoveryLocator = new 
ClusterServiceLocator(mockServiceDiscovery);
+        serviceDiscoveryFixedPortLocator = new 
ClusterServiceLocator(mockServiceDiscovery, fixedPort);
+        fixedServiceLocator = new ClusterServiceLocator(fixedService);
+
+    }
+
+    @Test
+    public void getServiceWhenServiceDiscoveryNotStarted() {
+        assertNull(serviceDiscoveryLocator.getService());
+    }
+
+    @Test
+    public void getServiceWhenServiceDiscoveryFixedPortNotStarted() {
+        assertNull(serviceDiscoveryLocator.getService());
+    }
+
+    @Test
+    public void getServiceWhenFixedServiceNotStarted() {
+        assertEquals(fixedService, fixedServiceLocator.getService());
+    }
+
+    @Test
+    public void getServiceNotOnFirstAttempt() {
+
+        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
+        config.setNumAttempts(2);
+        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+        config.setTimeBetweenAttempts(1);
+
+        serviceDiscoveryLocator.setAttemptsConfig(config);
+
+        OngoingStubbing<DiscoverableService> stubbing = null;
+        for (int i = 0; i < config.getNumAttempts() - 1; i++) {
+            if (stubbing == null) {
+                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
+            } else {
+                stubbing.thenReturn(null);
+            }
+        }
+        stubbing.thenReturn(fixedService);
+
+        assertEquals(fixedService, serviceDiscoveryLocator.getService());
+
+    }
+
+    @Test
+    public void getServiceNotOnFirstAttemptWithFixedPort() {
+
+        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
+        config.setNumAttempts(2);
+        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+        config.setTimeBetweenAttempts(1);
+
+        serviceDiscoveryFixedPortLocator.setAttemptsConfig(config);
+
+        OngoingStubbing<DiscoverableService> stubbing = null;
+        for (int i = 0; i < config.getNumAttempts() - 1; i++) {
+            if (stubbing == null) {
+                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
+            } else {
+                stubbing.thenReturn(null);
+            }
+        }
+        stubbing.thenReturn(fixedService);
+
+        InetSocketAddress resultAddress = 
InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(),
 fixedPort);
+        DiscoverableService resultService = new 
DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress);
+        assertEquals(resultService, 
serviceDiscoveryFixedPortLocator.getService());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
new file mode 100644
index 0000000..4d85d1a
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.net.InetSocketAddress;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class ClusterServicesBroadcasterTest {
+
+    private ClusterServicesBroadcaster broadcaster;
+
+    private MulticastProtocolListener listener;
+
+    private DummyProtocolHandler handler;
+
+    private InetSocketAddress multicastAddress;
+
+    private DiscoverableService broadcastedService;
+
+    private ProtocolContext protocolContext;
+
+    private MulticastConfiguration configuration;
+
+    @Before
+    public void setup() throws Exception {
+
+        broadcastedService = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", 11111));
+
+        multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
+
+        configuration = new MulticastConfiguration();
+
+        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        broadcaster = new ClusterServicesBroadcaster(multicastAddress, 
configuration, protocolContext, "500 ms");
+        broadcaster.addService(broadcastedService);
+
+        handler = new DummyProtocolHandler();
+        listener = new MulticastProtocolListener(5, multicastAddress, 
configuration, protocolContext);
+        listener.addHandler(handler);
+    }
+
+    @After
+    public void teardown() {
+
+        if (broadcaster.isRunning()) {
+            broadcaster.stop();
+        }
+
+        try {
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace(System.out);
+        }
+
+    }
+
+    @Test
+    @Ignore
+    public void testBroadcastReceived() throws Exception {
+
+        broadcaster.start();
+        listener.start();
+
+        Thread.sleep(1000);
+
+        listener.stop();
+
+        assertNotNull(handler.getProtocolMessage());
+        assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, 
handler.getProtocolMessage().getType());
+        final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) 
handler.getProtocolMessage();
+        assertEquals(broadcastedService.getServiceName(), 
msg.getServiceName());
+        assertEquals(broadcastedService.getServiceAddress().getHostName(), 
msg.getAddress());
+        assertEquals(broadcastedService.getServiceAddress().getPort(), 
msg.getPort());
+    }
+
+    private class DummyProtocolHandler implements ProtocolHandler {
+
+        private ProtocolMessage protocolMessage;
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            this.protocolMessage = msg;
+            return null;
+        }
+
+        public ProtocolMessage getProtocolMessage() {
+            return protocolMessage;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
new file mode 100644
index 0000000..6c79b90
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastUtils;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class MulticastProtocolListenerTest {
+
+    private MulticastProtocolListener listener;
+
+    private MulticastSocket socket;
+
+    private InetSocketAddress address;
+
+    private MulticastConfiguration configuration;
+
+    private ProtocolContext protocolContext;
+
+    @Before
+    public void setup() throws Exception {
+
+        address = new InetSocketAddress("226.1.1.1", 60000);
+        configuration = new MulticastConfiguration();
+
+        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        listener = new MulticastProtocolListener(5, address, configuration, 
protocolContext);
+        listener.start();
+
+        socket = MulticastUtils.createMulticastSocket(address.getPort(), 
configuration);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        try {
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        } finally {
+            MulticastUtils.closeQuietly(socket);
+        }
+    }
+
+    @Ignore("This test must be reworked.  Requires an active network 
connection")
+    @Test
+    public void testBadRequest() throws Exception {
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
+        listener.addHandler(handler);
+        DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address);
+        socket.send(packet);
+        Thread.sleep(250);
+        assertEquals(0, handler.getMessages().size());
+    }
+
+    @Test
+    @Ignore
+    public void testRequest() throws Exception {
+
+        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
+        listener.addHandler(handler);
+
+        ProtocolMessage msg = new PingMessage();
+        MulticastProtocolMessage multicastMsg = new 
MulticastProtocolMessage("some-id", msg);
+
+        // marshal message to output stream
+        ProtocolMessageMarshaller marshaller = 
protocolContext.createMarshaller();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        marshaller.marshal(multicastMsg, baos);
+        byte[] requestPacketBytes = baos.toByteArray();
+        DatagramPacket packet = new DatagramPacket(requestPacketBytes, 
requestPacketBytes.length, address);
+        socket.send(packet);
+
+        Thread.sleep(250);
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+
+    }
+
+    private class ReflexiveProtocolHandler implements ProtocolHandler {
+
+        private List<ProtocolMessage> messages = new ArrayList<>();
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            messages.add(msg);
+            return msg;
+        }
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        public List<ProtocolMessage> getMessages() {
+            return messages;
+        }
+
+    }
+
+    private class DelayedProtocolHandler implements ProtocolHandler {
+
+        private int delay = 0;
+
+        private List<ProtocolMessage> messages = new ArrayList<>();
+
+        public DelayedProtocolHandler(int delay) {
+            this.delay = delay;
+        }
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            try {
+                messages.add(msg);
+                Thread.sleep(delay);
+                return null;
+            } catch (final InterruptedException ie) {
+                throw new ProtocolException(ie);
+            }
+
+        }
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        public List<ProtocolMessage> getMessages() {
+            return messages;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
new file mode 100644
index 0000000..7c62d2f
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * @author unattributed
+ */
+public class NodeProtocolSenderImplTest {
+
+    private SocketProtocolListener listener;
+
+    private NodeProtocolSenderImpl sender;
+
+    private DiscoverableService service;
+
+    private ServerSocketConfiguration serverSocketConfiguration;
+
+    private ClusterServiceLocator mockServiceLocator;
+
+    private ProtocolHandler mockHandler;
+
+    private NodeIdentifier nodeIdentifier;
+
+    @Before
+    public void setup() throws IOException {
+
+        serverSocketConfiguration = new ServerSocketConfiguration();
+
+        mockServiceLocator = mock(ClusterServiceLocator.class);
+        mockHandler = mock(ProtocolHandler.class);
+
+        nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, 
"localhost", 5678);
+
+        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+
+        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
+        listener.setShutdownListenerSeconds(3);
+        listener.addHandler(mockHandler);
+        listener.start();
+
+        service = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", listener.getPort()));
+
+        SocketConfiguration socketConfiguration = new SocketConfiguration();
+        socketConfiguration.setReuseAddress(true);
+        sender = new NodeProtocolSenderImpl(mockServiceLocator, 
socketConfiguration, protocolContext);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        if (listener.isRunning()) {
+            listener.stop();
+        }
+    }
+
+    @Test
+    public void testConnect() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        ConnectionResponseMessage mockMessage = new 
ConnectionResponseMessage();
+        mockMessage.setConnectionResponse(new 
ConnectionResponse(nodeIdentifier, new 
StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, 
null, null, UUID.randomUUID().toString()));
+        
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
+
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+        ConnectionResponseMessage response = sender.requestConnection(request);
+        assertNotNull(response);
+    }
+
+    @Test(expected = UnknownServiceAddressException.class)
+    public void testConnectNoClusterManagerAddress() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(null);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
ConnectionResponseMessage());
+
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+    }
+
+    @Test(expected = ProtocolException.class)
+    public void testConnectBadResponse() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
+
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+
+    }
+
+    @Test(expected = ProtocolException.class)
+    public void testConnectDelayedResponse() throws Exception {
+
+        final int time = 250;
+        sender.getSocketConfiguration().setSocketTimeout(time);
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<ConnectionResponseMessage>() {
+            @Override
+            public ConnectionResponseMessage answer(InvocationOnMock 
invocation) throws Throwable {
+                Thread.sleep(time * 3);
+                return new ConnectionResponseMessage();
+            }
+        });
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+
+    }
+
+    @Test
+    public void testHeartbeat() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
+
+        HeartbeatMessage msg = new HeartbeatMessage();
+        HeartbeatPayload hbPayload = new HeartbeatPayload();
+        Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, 
"localhost", 4), false, false, hbPayload.marshal());
+        msg.setHeartbeat(hb);
+        sender.heartbeat(msg);
+    }
+
+    @Test
+    public void testNotifyControllerStartupFailure() throws Exception {
+
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
+
+        ControllerStartupFailureMessage msg = new 
ControllerStartupFailureMessage();
+        msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, 
"some-addr", 1));
+        msg.setExceptionMessage("some exception");
+        sender.notifyControllerStartupFailure(msg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
new file mode 100644
index 0000000..92a7d2a
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler;
+import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class SocketProtocolListenerTest {
+
+    private SocketProtocolListener listener;
+
+    private Socket socket;
+
+    private ProtocolMessageMarshaller<ProtocolMessage> marshaller;
+
+    private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;
+
+    @Before
+    public void setup() throws Exception {
+
+        final ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        marshaller = protocolContext.createMarshaller();
+        unmarshaller = protocolContext.createUnmarshaller();
+
+        ServerSocketConfiguration configuration = new 
ServerSocketConfiguration();
+        configuration.setSocketTimeout(1000);
+
+        listener = new SocketProtocolListener(5, 0, configuration, 
protocolContext);
+        listener.start();
+
+        int port = listener.getPort();
+
+        SocketConfiguration config = new SocketConfiguration();
+        config.setReuseAddress(true);
+        config.setSocketTimeout(1000);
+        socket = SocketUtils.createSocket(new InetSocketAddress("localhost", 
port), config);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        try {
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    @Test
+    public void testBadRequest() throws Exception {
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
+        listener.addHandler(handler);
+        socket.getOutputStream().write(5);
+        Thread.sleep(250);
+        assertEquals(0, handler.getMessages().size());
+    }
+
+    @Test
+    public void testRequest() throws Exception {
+        ProtocolMessage msg = new PingMessage();
+
+        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
+        listener.addHandler(handler);
+
+        // marshal message to output stream
+        marshaller.marshal(msg, socket.getOutputStream());
+
+        // unmarshall response and return
+        ProtocolMessage response = 
unmarshaller.unmarshal(socket.getInputStream());
+        assertEquals(msg.getType(), response.getType());
+
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+    }
+
+    @Test
+    public void testDelayedRequest() throws Exception {
+        ProtocolMessage msg = new PingMessage();
+
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(2000);
+        listener.addHandler(handler);
+
+        // marshal message to output stream
+        marshaller.marshal(msg, socket.getOutputStream());
+
+        try {
+            socket.getInputStream().read();
+            fail("Socket timeout not received.");
+        } catch (SocketTimeoutException ste) {
+        }
+
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
new file mode 100644
index 0000000..2f16777
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.testutils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+
+/**
+ * @author unattributed
+ */
+public class DelayedProtocolHandler implements ProtocolHandler {
+
+    private int delay = 0;
+    private List<ProtocolMessage> messages = new ArrayList<>();
+
+    public DelayedProtocolHandler(int delay) {
+        this.delay = delay;
+    }
+
+    @Override
+    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+        try {
+            messages.add(msg);
+            Thread.sleep(delay);
+            return null;
+        } catch (final InterruptedException ie) {
+            throw new ProtocolException(ie);
+        }
+
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return true;
+    }
+
+    public List<ProtocolMessage> getMessages() {
+        return messages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
new file mode 100644
index 0000000..e80f52c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.testutils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+
+/**
+ * @author unattributed
+ */
+public class ReflexiveProtocolHandler implements ProtocolHandler {
+
+    private List<ProtocolMessage> messages = new ArrayList<>();
+
+    @Override
+    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+        messages.add(msg);
+        return msg;
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return true;
+    }
+
+    public List<ProtocolMessage> getMessages() {
+        return messages;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..92eb78c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/logback-test.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration scan="true" scanPeriod="30 seconds">
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
+        </encoder>
+    </appender>
+    
+    <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
+    <logger name="org.apache.nifi" level="INFO"/>
+    
+    <!-- Logger for managing logging statements for nifi clusters. -->
+    <logger name="org.apache.nifi.cluster" level="INFO"/>
+
+    <!-- 
+        Logger for logging HTTP requests received by the web server.  Setting
+        log level to 'debug' activates HTTP request logging.
+    -->
+    <logger name="org.apache.nifi.server.JettyServer" level="INFO"/>
+
+    <!-- Logger for managing logging statements for jetty -->
+    <logger name="org.mortbay" level="INFO"/>
+
+    <!-- Suppress non-error messages due to excessive logging by class -->
+    <logger name="com.sun.jersey.spi.container.servlet.WebComponent" 
level="ERROR"/>
+
+    <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
+
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+    </root>
+    
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
new file mode 100755
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
new file mode 100755
index 0000000..e8e4c2b
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
@@ -0,0 +1,12 @@
+
+bad data should be skipped
+
+# this is a comment
+  2.2.2.2  # this is another comment ####
+3.3.3.3/8
+
+4.4.4.4/24
+
+5.5.5.255/31
+
+more bad data
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index bb53aff..f2bf6c9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -64,8 +64,8 @@
             <artifactId>nifi-client-dto</artifactId>
         </dependency>
         <dependency>
-               <groupId>org.apache.nifi</groupId>
-               <artifactId>nifi-site-to-site-client</artifactId>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
         </dependency>
         <dependency>
             <groupId>org.quartz-scheduler</groupId>
@@ -121,4 +121,19 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>src/test/resources/conf/0bytes.xml</exclude>
+                        
<exclude>src/test/resources/conf/termination-only.xml</exclude>
+                        <exclude>src/test/resources/hello.txt</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
index 60eda8a..6b587fd 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
@@ -588,6 +588,19 @@
                             </packagingIncludes>
                         </configuration>
                     </plugin>
+                    <plugin>
+                        <groupId>org.apache.rat</groupId>
+                        <artifactId>apache-rat-plugin</artifactId>
+                        <configuration>
+                            <excludes>
+                                <exclude>src/main/webapp/js/json2.js</exclude>
+                                <exclude>src/main/webapp/js/jquery/</exclude>
+                                
<exclude>src/main/webapp/js/d3/d3.min.js</exclude>
+                                
<exclude>src/main/webapp/js/codemirror/</exclude>
+                                
<exclude>src/main/webapp/css/reset.css</exclude>
+                            </excludes>
+                        </configuration>
+                    </plugin>
                 </plugins>
             </build>
         </profile>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml
index 544dd77..2aaa045 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml
@@ -30,9 +30,9 @@
         <module>nifi-security</module>
         <module>nifi-site-to-site</module>
         <module>nifi-framework-core</module>
-        <module>nifi-cluster-protocol</module>
-        <module>nifi-cluster-web</module>
-        <module>nifi-cluster</module>
+        <module>nifi-framework-cluster-protocol</module>
+        <module>nifi-framework-cluster-web</module>
+        <module>nifi-framework-cluster</module>
         <module>nifi-file-authorization-provider</module>
         <module>nifi-cluster-authorization-provider</module>
         <module>nifi-user-actions</module>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2043a0d..170a95e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -48,7 +48,7 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-distributed-cache-client-service-api</artifactId>
         </dependency>
-               <dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-http-context-map-api</artifactId>
         </dependency>
@@ -169,4 +169,88 @@
             <version>1.7</version>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>src/test/resources/hello.txt</exclude>
+                        
<exclude>src/test/resources/CharacterSetConversionSamples/Converted.txt</exclude>
+                        
<exclude>src/test/resources/CharacterSetConversionSamples/Original.txt</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFile.txt</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt</exclude>
+                        
<exclude>src/test/resources/ExecuteCommand/1000bytes.txt</exclude>
+                        
<exclude>src/test/resources/ExecuteCommand/test.txt</exclude>
+                        
<exclude>src/test/resources/ScanAttribute/dictionary-with-empty-new-lines</exclude>
+                        
<exclude>src/test/resources/ScanAttribute/dictionary-with-extra-info</exclude>
+                        
<exclude>src/test/resources/ScanAttribute/dictionary1</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude>
+                        
<exclude>src/test/resources/TestJson/json-sample.json</exclude>
+                        
<exclude>src/test/resources/TestMergeContent/demarcate</exclude>
+                        
<exclude>src/test/resources/TestMergeContent/foot</exclude>
+                        
<exclude>src/test/resources/TestMergeContent/head</exclude>
+                        
<exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude>
+                        
<exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude>
+                        
<exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude>
+                        
<exclude>src/test/resources/TestModifyBytes/testFile.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/$1$1.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/BRue_cRue_RiRey.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/Blu$2e_clu$2e.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/D$d_h$d.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/Good.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/Spider.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/[DODO].txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/cu[$1]_Po[$1].txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/cu_Po.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/food.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextLineByLine/testFile.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-backreference-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-blank-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-escaped-dollar-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping-simple.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-invalid-backreference-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-no-match-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-space-mapping.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/colors-without-dashes.txt</exclude>
+                        
<exclude>src/test/resources/TestReplaceTextWithMapping/colors.txt</exclude>
+                        
<exclude>src/test/resources/TestScanContent/helloWorld</exclude>
+                        
<exclude>src/test/resources/TestScanContent/wellthengood-bye</exclude>
+                        
<exclude>src/test/resources/TestSplitText/1.txt</exclude>
+                        
<exclude>src/test/resources/TestSplitText/2.txt</exclude>
+                        
<exclude>src/test/resources/TestSplitText/3.txt</exclude>
+                        
<exclude>src/test/resources/TestSplitText/4.txt</exclude>
+                        
<exclude>src/test/resources/TestSplitText/5.txt</exclude>
+                        
<exclude>src/test/resources/TestSplitText/6.txt</exclude>
+                        
<exclude>src/test/resources/TestSplitText/original.txt</exclude>
+                        
<exclude>src/test/resources/TestTransformXml/math.html</exclude>
+                        
<exclude>src/test/resources/TestTransformXml/tokens.csv</exclude>
+                        
<exclude>src/test/resources/TestTransformXml/tokens.xml</exclude>
+                        
<exclude>src/test/resources/TestUnpackContent/folder/cal.txt</exclude>
+                        
<exclude>src/test/resources/TestUnpackContent/folder/date.txt</exclude>
+                        
<exclude>src/test/resources/TestXml/xml-bundle-1</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFile.txt.bz2</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFile.txt.gz</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFile1.txt.bz2</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFile1.txt.gz</exclude>
+                        
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude>
+                        
<exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude>
+                        
<exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/1.txt.bz2</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/1.txt.gz</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/1.zip</exclude>
+                        
<exclude>src/test/resources/TestIdentifyMimeType/flowfilev1.tar</exclude>
+                        
<exclude>src/test/resources/TestUnpackContent/data.tar</exclude>
+                        
<exclude>src/test/resources/TestUnpackContent/data.zip</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>    
 </project>

Reply via email to