http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java
new file mode 100644
index 0000000..0d34dae
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "flowResponseMessage")
+public class FlowResponseMessage extends ProtocolMessage {
+    
+    private StandardDataFlow dataFlow;
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.FLOW_RESPONSE;
+    }
+
+    public StandardDataFlow getDataFlow() {
+        return dataFlow;
+    }
+
+    public void setDataFlow(StandardDataFlow dataFlow) {
+        this.dataFlow = dataFlow;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
new file mode 100644
index 0000000..0064cb6
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "heartbeatMessage")
+public class HeartbeatMessage extends ProtocolMessage {
+    
+    private Heartbeat heartbeat;
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.HEARTBEAT;
+    }
+
+    public Heartbeat getHeartbeat() {
+        return heartbeat;
+    }
+
+    public void setHeartbeat(Heartbeat heartbeat) {
+        this.heartbeat = heartbeat;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java
new file mode 100644
index 0000000..c6d2d44
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Wraps a protocol message and an identifier for sending the message by way
+ * multicast.  The identifier is necessary for the sender to identify a message
+ * sent by it.
+ * 
+ * @author unattributed
+ */
+@XmlRootElement(name = "multicastMessage")
+public class MulticastProtocolMessage extends ProtocolMessage {
+    
+    private ProtocolMessage protocolMessage;
+    
+    private String id;
+    
+    public MulticastProtocolMessage() {}
+
+    public MulticastProtocolMessage(final String id, final ProtocolMessage 
protocolMessage) {
+        this.protocolMessage = protocolMessage;
+        this.id = id;
+    }
+    
+    @Override
+    public MessageType getType() {
+        if(protocolMessage == null) {
+            return null;
+        }
+        return protocolMessage.getType();
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public ProtocolMessage getProtocolMessage() {
+        return protocolMessage;
+    }
+
+    public void setProtocolMessage(ProtocolMessage protocolMessage) {
+        this.protocolMessage = protocolMessage;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
new file mode 100644
index 0000000..9237a92
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import org.apache.nifi.cluster.protocol.NodeBulletins;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "nodeBulletinsMessage")
+public class NodeBulletinsMessage extends ProtocolMessage {
+    
+    private NodeBulletins bulletins;
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.BULLETINS;
+    }
+
+    public NodeBulletins getBulletins() {
+        return bulletins;
+    }
+
+    public void setBulletins(NodeBulletins bulletins) {
+        this.bulletins = bulletins;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java
new file mode 100644
index 0000000..ee38deb
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import java.util.Date;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "pingMessage")
+public class PingMessage extends ProtocolMessage {
+    
+    private String id;
+    
+    private Date date = new Date();
+
+    public PingMessage() {}
+    
+    public Date getDate() {
+        return date;
+    }
+
+    public void setDate(Date date) {
+        this.date = date;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.PING;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
new file mode 100644
index 0000000..a289abc
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "primaryRoleAssignmentMessage")
+public class PrimaryRoleAssignmentMessage extends ProtocolMessage {
+
+    private NodeIdentifier nodeId;
+
+    private boolean primary;
+    
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public boolean isPrimary() {
+        return primary;
+    }
+
+    public void setPrimary(boolean primary) {
+        this.primary = primary;
+    }
+   
+    @Override
+    public MessageType getType() {
+        return MessageType.PRIMARY_ROLE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
new file mode 100644
index 0000000..6bf2a13
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+/**
+ * @author unattributed
+ */
+public abstract class ProtocolMessage {
+    private volatile String requestorDN;
+    
+    public static enum MessageType {
+        BULLETINS,
+        CONNECTION_REQUEST,
+        CONNECTION_RESPONSE,
+        CONTROLLER_STARTUP_FAILURE,
+        RECONNECTION_FAILURE,
+        DISCONNECTION_REQUEST,
+        EXCEPTION,
+        FLOW_REQUEST,
+        FLOW_RESPONSE,
+        HEARTBEAT,
+        PING,
+        PRIMARY_ROLE,
+        RECONNECTION_REQUEST,
+        RECONNECTION_RESPONSE,
+        SERVICE_BROADCAST,
+    }
+    
+    public abstract MessageType getType();
+    
+    /**
+     * Sets the DN of the entity making the request
+     * @param dn
+     */
+    public void setRequestorDN(final String dn) {
+        this.requestorDN = dn;
+    }
+    
+    /**
+     * Returns the DN of the entity that made the request, if using a secure 
socket. Otherwise, returns <code>null</code>
+     * @return
+     */
+    public String getRequestorDN() {
+        return requestorDN;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
new file mode 100644
index 0000000..ba45e28
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+@XmlRootElement(name = "reconnectionFailureMessage")
+public class ReconnectionFailureMessage extends ExceptionMessage {
+    private NodeIdentifier nodeId;
+    
+    public ReconnectionFailureMessage() {}
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.RECONNECTION_FAILURE;
+    }
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
new file mode 100644
index 0000000..eab3d5d
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "reconnectionRequestMessage")
+public class ReconnectionRequestMessage extends ProtocolMessage {
+
+    private NodeIdentifier nodeId;
+    private StandardDataFlow dataFlow;
+    private boolean primary;
+    private Integer managerRemoteSiteListeningPort;
+    private Boolean managerRemoteSiteCommsSecure;
+    private String instanceId;
+    
+    public ReconnectionRequestMessage() {}
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public StandardDataFlow getDataFlow() {
+        return dataFlow;
+    }
+
+    public void setDataFlow(StandardDataFlow dataFlow) {
+        this.dataFlow = dataFlow;
+    }
+
+    public boolean isPrimary() {
+        return primary;
+    }
+
+    public void setPrimary(boolean primary) {
+        this.primary = primary;
+    }
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.RECONNECTION_REQUEST;
+    }
+    
+    public void setManagerRemoteSiteListeningPort(final Integer listeningPort) 
{
+        this.managerRemoteSiteListeningPort = listeningPort;
+    }
+    
+    public Integer getManagerRemoteSiteListeningPort() {
+        return managerRemoteSiteListeningPort;
+    }
+    
+    public void setManagerRemoteSiteCommsSecure(final Boolean 
remoteSiteCommsSecure) {
+        this.managerRemoteSiteCommsSecure = remoteSiteCommsSecure;
+    }
+    
+    public Boolean isManagerRemoteSiteCommsSecure() {
+        return managerRemoteSiteCommsSecure;
+    }
+    
+    public void setInstanceId(final String instanceId) {
+        this.instanceId = instanceId;
+    }
+    
+    public String getInstanceId() {
+        return instanceId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java
new file mode 100644
index 0000000..fd0f921
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * This message is used an "ACK" for a ReconnectionRequestMessage
+ */
+@XmlRootElement(name = "reconnectionResponseMessage")
+public class ReconnectionResponseMessage extends ProtocolMessage {
+
+    @Override
+    public MessageType getType() {
+        return MessageType.RECONNECTION_RESPONSE;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java
new file mode 100644
index 0000000..92708ba
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "serviceBroadcastMessage")
+public class ServiceBroadcastMessage extends ProtocolMessage {
+
+    private String serviceName;
+    
+    private String address;
+    
+    private int port;
+    
+    public ServiceBroadcastMessage() {}
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.SERVICE_BROADCAST;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java
new file mode 100644
index 0000000..fa201bb
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.spring;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * Factory bean for creating a singleton MulticastConfiguration instance.  
+ */
+public class MulticastConfigurationFactoryBean implements FactoryBean {
+    
+    private MulticastConfiguration configuration;
+    private NiFiProperties properties;
+    
+    @Override
+    public Object getObject() throws Exception {
+        if(configuration == null) {
+            configuration = new MulticastConfiguration();
+            
+            final int timeout = (int) 
FormatUtils.getTimeDuration(properties.getClusterProtocolSocketTimeout(), 
TimeUnit.MILLISECONDS);
+            configuration.setSocketTimeout(timeout);
+            configuration.setReuseAddress(true);
+        }
+        return configuration;
+
+    }
+
+    @Override
+    public Class getObjectType() {
+        return MulticastConfiguration.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+    
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java
new file mode 100644
index 0000000..5b5816d
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.spring;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.socket.SSLContextFactory;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * Factory bean for creating a singleton ServerSocketConfiguration instance.  
+ */
+public class ServerSocketConfigurationFactoryBean implements 
FactoryBean<ServerSocketConfiguration> {
+    private ServerSocketConfiguration configuration;
+    private NiFiProperties properties;
+    
+    @Override
+    public ServerSocketConfiguration getObject() throws Exception {
+        if(configuration == null) {
+            configuration = new ServerSocketConfiguration();
+            configuration.setNeedClientAuth(properties.getNeedClientAuth());
+            
+            final int timeout = (int) 
FormatUtils.getTimeDuration(properties.getClusterProtocolSocketTimeout(), 
TimeUnit.MILLISECONDS);
+            configuration.setSocketTimeout(timeout);
+            configuration.setReuseAddress(true);
+            
if(Boolean.valueOf(properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_IS_SECURE)))
 {
+                configuration.setSSLContextFactory(new 
SSLContextFactory(properties));
+            }
+        }
+        return configuration;
+
+    }
+
+    @Override
+    public Class<ServerSocketConfiguration> getObjectType() {
+        return ServerSocketConfiguration.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+    
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java
new file mode 100644
index 0000000..b438e44
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.spring;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.socket.SSLContextFactory;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+
+import org.springframework.beans.factory.FactoryBean;
+
+/**
+ * Factory bean for creating a singleton SocketConfiguration instance.  
+ */
+public class SocketConfigurationFactoryBean implements 
FactoryBean<SocketConfiguration> {
+    
+    private SocketConfiguration configuration;
+    
+    private NiFiProperties properties;
+    
+    @Override
+    public SocketConfiguration getObject() throws Exception {
+        if(configuration == null) {
+            configuration = new SocketConfiguration();
+            
+            final int timeout = (int) 
FormatUtils.getTimeDuration(properties.getClusterProtocolSocketTimeout(), 
TimeUnit.MILLISECONDS);
+            configuration.setSocketTimeout(timeout);
+            configuration.setReuseAddress(true);
+            
if(Boolean.valueOf(properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_IS_SECURE)))
 {
+                configuration.setSSLContextFactory(new 
SSLContextFactory(properties));
+            }
+        }
+        return configuration;
+
+    }
+
+    @Override
+    public Class<SocketConfiguration> getObjectType() {
+        return SocketConfiguration.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+    
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
new file mode 100644
index 0000000..07ea7a4
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!-- marked as lazy so that cluster protocol beans are not created when 
applications runs in standalone mode -->
+<beans default-lazy-init="true"
+       xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util-3.1.xsd";>
+
+    <!-- protocol context -->
+    <bean id="protocolContext" 
class="org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext">
+        <constructor-arg>
+            <util:constant 
static-field="org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils.JAXB_CONTEXT"/>
+        </constructor-arg>
+    </bean>
+    
+    <!-- socket configuration -->
+    <bean id="protocolSocketConfiguration" 
class="org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean">
+        <property name="properties" ref="nifiProperties"/>
+    </bean>
+    
+    <!-- server socket configuration -->
+    <bean id="protocolServerSocketConfiguration" 
class="org.apache.nifi.cluster.protocol.spring.ServerSocketConfigurationFactoryBean">
+        <property name="properties" ref="nifiProperties"/>
+    </bean>
+    
+    <!-- multicast configuration -->
+    <bean id="protocolMulticastConfiguration" 
class="org.apache.nifi.cluster.protocol.spring.MulticastConfigurationFactoryBean">
+        <property name="properties" ref="nifiProperties"/>
+    </bean>
+
+    <!-- cluster manager protocol sender -->
+    <bean id="clusterManagerProtocolSender" 
class="org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl">
+        <constructor-arg ref="protocolSocketConfiguration"/>
+        <constructor-arg ref="protocolContext"/>
+        <property name="handshakeTimeout">
+            <bean factory-bean="nifiProperties" 
factory-method="getClusterProtocolConnectionHandshakeTimeout"/>
+        </property>
+    </bean>
+    
+    <!-- cluster manager protocol listener -->
+    <bean id="clusterManagerProtocolListener" 
class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener">
+        <constructor-arg index="0">
+            <bean factory-bean="nifiProperties" 
factory-method="getClusterManagerProtocolThreads"/>
+        </constructor-arg>
+        <constructor-arg index="1">
+            <bean factory-bean="nifiProperties" 
factory-method="getClusterManagerProtocolPort"/>
+        </constructor-arg>
+        <constructor-arg ref="protocolServerSocketConfiguration" index="2"/>
+        <constructor-arg ref="protocolContext" index="3"/>
+    </bean>
+    
+    <!-- cluster manager sender/listener -->
+    <bean id="clusterManagerProtocolSenderListener" 
class="org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener">
+        <constructor-arg ref="clusterManagerProtocolSender"/>
+        <constructor-arg ref="clusterManagerProtocolListener"/>
+    </bean>
+    
+    <!-- node protocol sender -->
+    <bean id="nodeProtocolSender" 
class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl">
+        <constructor-arg ref="clusterManagerProtocolServiceLocator"/>
+        <constructor-arg ref="protocolSocketConfiguration"/>
+        <constructor-arg ref="protocolContext"/>
+    </bean>
+    
+    <!-- node protocol listener -->
+    <bean id="nodeProtocolListener" 
class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener">
+        <constructor-arg index="0">
+            <bean factory-bean="nifiProperties" 
factory-method="getClusterNodeProtocolThreads"/>
+        </constructor-arg>
+        <constructor-arg index="1">
+            <bean factory-bean="nifiProperties" 
factory-method="getClusterNodeProtocolPort"/>
+        </constructor-arg>
+        <constructor-arg ref="protocolServerSocketConfiguration" index="2"/>
+        <constructor-arg ref="protocolContext" index="3"/>
+    </bean>
+    
+    <!-- node sender/listener -->
+    <bean id="nodeProtocolSenderListener" 
class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener">
+        <constructor-arg ref="nodeProtocolSender"/>
+        <constructor-arg ref="nodeProtocolListener"/>
+    </bean>
+    
+    <!-- cluster services broadcaster -->
+    <bean id="clusterServicesBroadcaster" 
class="org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster">
+        <constructor-arg index="0">
+            <bean factory-bean="nifiProperties" 
factory-method="getClusterProtocolMulticastAddress"/>
+        </constructor-arg>
+        <constructor-arg ref="protocolMulticastConfiguration" index="1"/>
+        <constructor-arg ref="protocolContext" index="2"/>
+        <constructor-arg index="3">
+            <bean factory-bean="nifiProperties" 
factory-method="getClusterProtocolMulticastServiceBroadcastDelay"/>
+        </constructor-arg>
+    </bean>
+ 
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
new file mode 100644
index 0000000..59837c1
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
+import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl;
+import java.io.IOException;
+import java.net.InetAddress;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * @author unattributed
+ */
+public class ClusterManagerProtocolSenderImplTest {
+    
+    private InetAddress address;
+    
+    private int port;
+    
+    private SocketProtocolListener listener;
+    
+    private ClusterManagerProtocolSenderImpl sender;
+    
+    private ProtocolHandler mockHandler;
+    
+    @Before
+    public void setup() throws IOException {
+        
+        address = InetAddress.getLocalHost();
+        ServerSocketConfiguration serverSocketConfiguration = new 
ServerSocketConfiguration();
+        serverSocketConfiguration.setSocketTimeout(2000);
+
+        mockHandler = mock(ProtocolHandler.class);
+        
+        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        
+        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
+        listener.addHandler(mockHandler);
+        listener.start();
+        
+        port = listener.getPort();
+        
+        SocketConfiguration socketConfiguration = new SocketConfiguration();
+        sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, 
protocolContext);
+    }
+    
+    @After
+    public void teardown() throws IOException {
+        if(listener.isRunning()) {
+            listener.stop();
+        }
+    }
+    
+    @Test
+    public void testRequestFlow() throws Exception {
+        
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
FlowResponseMessage());
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        FlowResponseMessage response = sender.requestFlow(request);
+        assertNotNull(response);
+    }
+    
+    @Test
+    public void testRequestFlowWithBadResponseMessage() throws Exception {
+        
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        try {
+            sender.requestFlow(request);
+            fail("failed to throw exception");
+        } catch(ProtocolException pe) {}
+        
+    }
+    
+    @Test
+    public void testRequestFlowDelayedResponse() throws Exception {
+        
+        final int time = 250;
+        sender.getSocketConfiguration().setSocketTimeout(time);
+        
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<FlowResponseMessage>() {
+            @Override
+            public FlowResponseMessage answer(InvocationOnMock invocation) 
throws Throwable {
+                Thread.sleep(time * 3);
+                return new FlowResponseMessage();
+            }
+        });
+        FlowRequestMessage request = new FlowRequestMessage();
+        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
address.getHostAddress(), port));
+        try {
+            sender.requestFlow(request);
+            fail("failed to throw exception");
+        } catch(ProtocolException pe) {}
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
new file mode 100644
index 0000000..91f81af
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastUtils;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class ClusterServiceDiscoveryTest {
+    
+    private ClusterServiceDiscovery discovery;
+    
+    private String serviceName;
+    
+    private MulticastSocket socket;
+    
+    private InetSocketAddress multicastAddress;
+    
+    private MulticastConfiguration configuration;
+    
+    private ProtocolContext protocolContext;
+    
+    @Before
+    public void setup() throws Exception {
+
+        serviceName = "some-service";
+        multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
+        configuration = new MulticastConfiguration();
+        
+        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        
+        discovery = new ClusterServiceDiscovery(serviceName, multicastAddress, 
configuration, protocolContext);
+        discovery.start();
+
+        socket = 
MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration);
+    }
+    
+    @After
+    public void teardown() throws IOException {
+        try {
+            if(discovery.isRunning()) {
+                discovery.stop();
+            }
+        } finally {
+            MulticastUtils.closeQuietly(socket);
+        }
+    }
+    
+    @Test
+    public void testGetAddressOnStartup() {
+        assertNull(discovery.getService());
+    }   
+            
+    @Ignore("This test has an NPE after ignoring another...perhaps has a bad 
inter-test dependency")
+    @Test
+    public void testGetAddressAfterBroadcast() throws Exception {
+        
+        ServiceBroadcastMessage msg = new ServiceBroadcastMessage();
+        msg.setServiceName("some-service");
+        msg.setAddress("3.3.3.3");
+        msg.setPort(1234);
+        
+        // marshal message to output stream
+        ProtocolMessageMarshaller marshaller = 
protocolContext.createMarshaller();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        marshaller.marshal(msg, baos);
+        byte[] requestPacketBytes = baos.toByteArray();
+        DatagramPacket packet = new DatagramPacket(requestPacketBytes, 
requestPacketBytes.length, multicastAddress);
+        socket.send(packet);
+        
+        Thread.sleep(250);
+       
+        InetSocketAddress updatedAddress = 
discovery.getService().getServiceAddress();
+        assertEquals("some-service", discovery.getServiceName());
+        assertEquals("3.3.3.3", updatedAddress.getHostName());
+        assertEquals(1234, updatedAddress.getPort());
+        
+    }
+    
+    @Test
+    public void testBadBroadcastMessage() throws Exception {
+        
+        ProtocolMessage msg = new PingMessage();
+        
+        // marshal message to output stream
+        ProtocolMessageMarshaller marshaller = 
protocolContext.createMarshaller();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        marshaller.marshal(msg, baos);
+        byte[] requestPacketBytes = baos.toByteArray();
+        DatagramPacket packet = new DatagramPacket(requestPacketBytes, 
requestPacketBytes.length, multicastAddress);
+        socket.send(packet);
+        
+        Thread.sleep(250);
+       
+        assertNull(discovery.getService());
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
new file mode 100644
index 0000000..b1c156b
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+import org.mockito.stubbing.OngoingStubbing;
+
+public class ClusterServiceLocatorTest {
+    
+    private ClusterServiceDiscovery mockServiceDiscovery;
+    
+    private int fixedPort;
+    
+    private DiscoverableService fixedService;
+    
+    private ClusterServiceLocator serviceDiscoveryLocator;
+    
+    private ClusterServiceLocator serviceDiscoveryFixedPortLocator;
+    
+    private ClusterServiceLocator fixedServiceLocator;
+    
+    @Before
+    public void setup() throws Exception {
+        
+        fixedPort = 1;
+        mockServiceDiscovery = mock(ClusterServiceDiscovery.class);
+        fixedService = new DiscoverableServiceImpl("some-service", 
InetSocketAddress.createUnresolved("some-host", 20));
+        
+        serviceDiscoveryLocator = new 
ClusterServiceLocator(mockServiceDiscovery);
+        serviceDiscoveryFixedPortLocator = new 
ClusterServiceLocator(mockServiceDiscovery, fixedPort);
+        fixedServiceLocator = new ClusterServiceLocator(fixedService);
+        
+    }
+    
+    @Test
+    public void getServiceWhenServiceDiscoveryNotStarted() {
+        assertNull(serviceDiscoveryLocator.getService());
+    }
+    
+    @Test
+    public void getServiceWhenServiceDiscoveryFixedPortNotStarted() {
+        assertNull(serviceDiscoveryLocator.getService());
+    }
+    
+    @Test
+    public void getServiceWhenFixedServiceNotStarted() {
+        assertEquals(fixedService, fixedServiceLocator.getService());
+    }
+    
+    @Test
+    public void getServiceNotOnFirstAttempt() {
+                
+        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
+        config.setNumAttempts(2);
+        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+        config.setTimeBetweenAttempts(1);
+        
+        serviceDiscoveryLocator.setAttemptsConfig(config);
+        
+        OngoingStubbing<DiscoverableService> stubbing = null;
+        for(int i = 0; i < config.getNumAttempts() - 1; i++) {
+            if(stubbing == null) {
+                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
+            } else {
+                stubbing.thenReturn(null);
+            }
+        }
+        stubbing.thenReturn(fixedService);
+        
+        assertEquals(fixedService, serviceDiscoveryLocator.getService());
+        
+    }
+    
+    @Test
+    public void getServiceNotOnFirstAttemptWithFixedPort() {
+        
+        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
+        config.setNumAttempts(2);
+        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
+        config.setTimeBetweenAttempts(1);
+        
+        serviceDiscoveryFixedPortLocator.setAttemptsConfig(config);
+        
+        OngoingStubbing<DiscoverableService> stubbing = null;
+        for(int i = 0; i < config.getNumAttempts() - 1; i++) {
+            if(stubbing == null) {
+                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
+            } else {
+                stubbing.thenReturn(null);
+            }
+        }
+        stubbing.thenReturn(fixedService);
+        
+        InetSocketAddress resultAddress = 
InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(),
 fixedPort);
+        DiscoverableService resultService = new 
DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress);
+        assertEquals(resultService, 
serviceDiscoveryFixedPortLocator.getService());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
new file mode 100644
index 0000000..ec1f26d
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
+import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener;
+import java.net.InetSocketAddress;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class ClusterServicesBroadcasterTest {
+    
+    private ClusterServicesBroadcaster broadcaster;
+    
+    private MulticastProtocolListener listener;
+    
+    private DummyProtocolHandler handler;
+    
+    private InetSocketAddress multicastAddress;
+    
+    private DiscoverableService broadcastedService;
+
+    private ProtocolContext protocolContext;
+    
+    private MulticastConfiguration configuration;
+    
+    @Before
+    public void setup() throws Exception {
+
+        broadcastedService = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", 11111));
+        
+        multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
+        
+        configuration = new MulticastConfiguration();
+        
+        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        
+        broadcaster = new ClusterServicesBroadcaster(multicastAddress, 
configuration, protocolContext, "500 ms");
+        broadcaster.addService(broadcastedService);
+        
+        handler = new DummyProtocolHandler();
+        listener = new MulticastProtocolListener(5, multicastAddress, 
configuration, protocolContext);
+        listener.addHandler(handler);
+    }
+    
+    @After
+    public void teardown() {
+        
+        if(broadcaster.isRunning()) {
+            broadcaster.stop();
+        }
+        
+        try {
+            if(listener.isRunning()) {
+                listener.stop();
+            }
+        } catch(Exception ex) {
+            ex.printStackTrace(System.out);
+        }
+        
+    }
+    
+    @Ignore("fails needs to be fixed")
+    @Test
+    public void testBroadcastReceived() throws Exception {
+        
+        broadcaster.start();
+        listener.start();
+        
+        Thread.sleep(1000);
+        
+        listener.stop();
+        
+        assertNotNull(handler.getProtocolMessage());
+        assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, 
handler.getProtocolMessage().getType());
+        final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) 
handler.getProtocolMessage();
+        assertEquals(broadcastedService.getServiceName(), 
msg.getServiceName());
+        assertEquals(broadcastedService.getServiceAddress().getHostName(), 
msg.getAddress());
+        assertEquals(broadcastedService.getServiceAddress().getPort(), 
msg.getPort());
+    }
+    
+    private class DummyProtocolHandler implements ProtocolHandler {
+
+        private ProtocolMessage protocolMessage;
+        
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            this.protocolMessage = msg;
+            return null;
+        }
+        
+        public ProtocolMessage getProtocolMessage() {
+            return protocolMessage;
+        }
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
new file mode 100644
index 0000000..af00590
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastUtils;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class MulticastProtocolListenerTest {
+    
+    private MulticastProtocolListener listener;
+    
+    private MulticastSocket socket;
+    
+    private InetSocketAddress address;
+    
+    private MulticastConfiguration configuration;
+    
+    private ProtocolContext protocolContext;
+    
+    @Before
+    public void setup() throws Exception {
+
+        address = new InetSocketAddress("226.1.1.1", 60000);
+        configuration = new MulticastConfiguration();
+        
+        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        
+        listener = new MulticastProtocolListener(5, address, configuration, 
protocolContext);
+        listener.start();
+
+        socket = MulticastUtils.createMulticastSocket(address.getPort(), 
configuration);
+    }
+    
+    @After
+    public void teardown() throws IOException {
+        try {
+            if(listener.isRunning()) {
+                listener.stop();
+            }
+        } finally {
+            MulticastUtils.closeQuietly(socket);
+        }
+    }
+    
+    @Test
+    public void testBadRequest() throws Exception {
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
+        listener.addHandler(handler);
+        DatagramPacket packet = new DatagramPacket(new byte[] {5}, 1, address);
+        socket.send(packet);
+        Thread.sleep(250);
+        assertEquals(0, handler.getMessages().size());
+    }
+    
+    @Ignore("this test works sometimes and fails others - needs work to be 
reliable")
+    @Test
+    public void testRequest() throws Exception {
+
+        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
+        listener.addHandler(handler);
+        
+        ProtocolMessage msg = new PingMessage();
+        MulticastProtocolMessage multicastMsg = new 
MulticastProtocolMessage("some-id", msg);
+
+        // marshal message to output stream
+        ProtocolMessageMarshaller marshaller = 
protocolContext.createMarshaller();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        marshaller.marshal(multicastMsg, baos);
+        byte[] requestPacketBytes = baos.toByteArray();
+        DatagramPacket packet = new DatagramPacket(requestPacketBytes, 
requestPacketBytes.length, address);
+        socket.send(packet);
+
+        Thread.sleep(250);
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+        
+    }
+    
+    private class ReflexiveProtocolHandler implements ProtocolHandler {
+        
+        private List<ProtocolMessage> messages = new ArrayList<>();
+        
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            messages.add(msg);
+            return msg;
+        }
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+        
+        public List<ProtocolMessage> getMessages() {
+            return messages;
+        }
+        
+    }
+
+    private class DelayedProtocolHandler implements ProtocolHandler {
+            
+        private int delay = 0;
+        
+        private List<ProtocolMessage> messages = new ArrayList<>();
+            
+        public DelayedProtocolHandler(int delay) {
+            this.delay = delay;
+        }
+        
+        @Override
+        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+            try {
+                messages.add(msg);
+                Thread.sleep(delay);
+                return null;
+            } catch(final InterruptedException ie) {
+                throw new ProtocolException(ie);
+            }
+
+        }
+
+        @Override
+        public boolean canHandle(ProtocolMessage msg) {
+            return true;
+        }
+        
+        public List<ProtocolMessage> getMessages() {
+            return messages;
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
new file mode 100644
index 0000000..1c5ba9e
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
+import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
+import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * @author unattributed
+ */
+@Ignore("Randomly tests... probably timing-specific")
+public class NodeProtocolSenderImplTest {
+    
+    private SocketProtocolListener listener;
+    
+    private NodeProtocolSenderImpl sender;
+    
+    private DiscoverableService service;
+    
+    private ServerSocketConfiguration serverSocketConfiguration;
+    
+    private ClusterServiceLocator mockServiceLocator;
+    
+    private ProtocolHandler mockHandler;
+    
+    private NodeIdentifier nodeIdentifier;
+    
+    @Before
+    public void setup() throws IOException {
+        
+        serverSocketConfiguration = new ServerSocketConfiguration();
+
+        mockServiceLocator = mock(ClusterServiceLocator.class);
+        mockHandler = mock(ProtocolHandler.class);
+        
+        nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, 
"localhost", 5678);
+        
+        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        
+        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
+        listener.setShutdownListenerSeconds(3);
+        listener.addHandler(mockHandler);
+        listener.start();
+        
+        service = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", listener.getPort()));
+        
+        SocketConfiguration socketConfiguration = new SocketConfiguration();
+        socketConfiguration.setReuseAddress(true);
+        sender = new NodeProtocolSenderImpl(mockServiceLocator, 
socketConfiguration, protocolContext);
+    }
+    
+    @After
+    public void teardown() throws IOException {
+        if(listener.isRunning()) {
+            listener.stop();
+        }
+    }
+    
+    @Test
+    public void testConnect() throws Exception {
+        
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        ConnectionResponseMessage mockMessage = new 
ConnectionResponseMessage();
+        mockMessage.setConnectionResponse(new 
ConnectionResponse(nodeIdentifier, new 
StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, 
null, null, UUID.randomUUID().toString()));
+        
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
+        
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+        ConnectionResponseMessage response = sender.requestConnection(request);
+        assertNotNull(response);
+    }
+    
+    @Test(expected = UnknownServiceAddressException.class)
+    public void testConnectNoClusterManagerAddress() throws Exception {
+        
+        when(mockServiceLocator.getService()).thenReturn(null);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
ConnectionResponseMessage());
+        
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+    }
+    
+    @Test(expected = ProtocolException.class)
+    public void testConnectBadResponse() throws Exception {
+        
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
+        
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+        
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+        
+    }
+    
+    @Test(expected = ProtocolException.class)
+    public void testConnectDelayedResponse() throws Exception {
+        
+        final int time = 250;
+        sender.getSocketConfiguration().setSocketTimeout(time);
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<ConnectionResponseMessage>() {
+            @Override
+            public ConnectionResponseMessage answer(InvocationOnMock 
invocation) throws Throwable {
+                Thread.sleep(time * 3);
+                return new ConnectionResponseMessage();
+            }
+        });
+        ConnectionRequestMessage request = new ConnectionRequestMessage();
+        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
+
+        sender.requestConnection(request);
+        fail("failed to throw exception");
+        
+    }
+    
+    @Test
+    public void testHeartbeat() throws Exception {
+        
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
+        
+        HeartbeatMessage hb = new HeartbeatMessage();
+        hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, 
"localhost", 4), false, false, new byte[] {1, 2, 3}));
+        sender.heartbeat(hb);
+    }
+    
+    @Test
+    public void testNotifyControllerStartupFailure() throws Exception {
+        
+        when(mockServiceLocator.getService()).thenReturn(service);
+        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
+        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
+        
+        ControllerStartupFailureMessage msg = new 
ControllerStartupFailureMessage();
+        msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, 
"some-addr", 1));
+        msg.setExceptionMessage("some exception");
+        sender.notifyControllerStartupFailure(msg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
new file mode 100644
index 0000000..07ee83a
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl.testutils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+
+/**
+ * @author unattributed
+ */
+public class DelayedProtocolHandler implements ProtocolHandler {
+
+    private int delay = 0;
+    private List<ProtocolMessage> messages = new ArrayList<>();
+
+    public DelayedProtocolHandler(int delay) {
+        this.delay = delay;
+    }
+
+    @Override
+    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
+        try {
+            messages.add(msg);
+            Thread.sleep(delay);
+            return null;
+        } catch (final InterruptedException ie) {
+            throw new ProtocolException(ie);
+        }
+
+    }
+
+    @Override
+    public boolean canHandle(ProtocolMessage msg) {
+        return true;
+    }
+
+    public List<ProtocolMessage> getMessages() {
+        return messages;
+    }
+}

Reply via email to