NIFI-557 fixed the correct test and removed extraneous/duplicative ones - build 
now works

Signed-off-by: joewitt <joew...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7853e3c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7853e3c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7853e3c1

Branch: refs/heads/master
Commit: 7853e3c165bbbf0d4c3bcb116bd9d9512f5e3578
Parents: 997ed94
Author: joewitt <joew...@apache.org>
Authored: Fri May 1 16:17:25 2015 -0400
Committer: joewitt <joew...@apache.org>
Committed: Sat May 2 16:03:28 2015 -0400

----------------------------------------------------------------------
 .../impl/SocketProtocolListenerTest.java        | 133 ++++++++++++
 .../ClusterManagerProtocolSenderImplTest.java   | 144 -------------
 .../impl/ClusterServiceLocatorTest.java         | 121 -----------
 .../impl/ClusterServicesBroadcasterTest.java    | 132 ------------
 .../impl/MulticastProtocolListenerTest.java     | 171 ----------------
 .../impl/NodeProtocolSenderImplTest.java        | 202 -------------------
 .../impl/SocketProtocolListenerTest.java        | 133 ------------
 .../testutils/DelayedProtocolHandler.java       |  57 ------
 .../testutils/ReflexiveProtocolHandler.java     |  47 -----
 9 files changed, 133 insertions(+), 1007 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
new file mode 100644
index 0000000..7a91c29
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
+import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler;
+import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author unattributed
+ */
+public class SocketProtocolListenerTest {
+
+    private SocketProtocolListener listener;
+
+    private Socket socket;
+
+    private ProtocolMessageMarshaller<ProtocolMessage> marshaller;
+
+    private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;
+
+    @Before
+    public void setup() throws Exception {
+
+        final ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
+        marshaller = protocolContext.createMarshaller();
+        unmarshaller = protocolContext.createUnmarshaller();
+
+        ServerSocketConfiguration configuration = new 
ServerSocketConfiguration();
+        configuration.setSocketTimeout(1000);
+
+        listener = new SocketProtocolListener(5, 0, configuration, 
protocolContext);
+        listener.start();
+
+        int port = listener.getPort();
+
+        SocketConfiguration config = new SocketConfiguration();
+        config.setReuseAddress(true);
+        config.setSocketTimeout(1000);
+        socket = SocketUtils.createSocket(new InetSocketAddress("localhost", 
port), config);
+    }
+
+    @After
+    public void teardown() throws IOException {
+        try {
+            if (listener.isRunning()) {
+                listener.stop();
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    @Test
+    public void testBadRequest() throws Exception {
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
+        listener.addHandler(handler);
+        socket.getOutputStream().write(5);
+        Thread.sleep(250);
+        assertEquals(0, handler.getMessages().size());
+    }
+
+    @Test
+    public void testRequest() throws Exception {
+        ProtocolMessage msg = new PingMessage();
+
+        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
+        listener.addHandler(handler);
+
+        // marshal message to output stream
+        marshaller.marshal(msg, socket.getOutputStream());
+
+        // unmarshall response and return
+        ProtocolMessage response = 
unmarshaller.unmarshal(socket.getInputStream());
+        assertEquals(msg.getType(), response.getType());
+
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+    }
+
+    @Test
+    public void testDelayedRequest() throws Exception {
+        ProtocolMessage msg = new PingMessage();
+
+        DelayedProtocolHandler handler = new DelayedProtocolHandler(2000);
+        listener.addHandler(handler);
+
+        // marshal message to output stream
+        marshaller.marshal(msg, socket.getOutputStream());
+
+        try {
+            socket.getInputStream().read();
+            fail("Socket timeout not received.");
+        } catch (SocketTimeoutException ste) {
+        }
+
+        assertEquals(1, handler.getMessages().size());
+        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
deleted file mode 100644
index 9e4237f..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.junit.After;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * @author unattributed
- */
-public class ClusterManagerProtocolSenderImplTest {
-
-    private InetAddress address;
-
-    private int port;
-
-    private SocketProtocolListener listener;
-
-    private ClusterManagerProtocolSenderImpl sender;
-
-    private ProtocolHandler mockHandler;
-
-    @Before
-    public void setup() throws IOException, InterruptedException {
-
-        address = InetAddress.getLocalHost();
-        final ServerSocketConfiguration serverSocketConfiguration = new 
ServerSocketConfiguration();
-
-        mockHandler = mock(ProtocolHandler.class);
-
-        final ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
-        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
-        listener.addHandler(mockHandler);
-        listener.start();
-
-        // Need to be sure that we give the listener plenty of time to 
startup. Otherwise, we get intermittent
-        // test failures because the Thread started by listener.start() isn't 
ready to accept connections
-        // before we make them.
-        Thread.sleep(1000L);
-
-        port = listener.getPort();
-
-        final SocketConfiguration socketConfiguration = new 
SocketConfiguration();
-        sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, 
protocolContext);
-    }
-
-    @After
-    public void teardown() throws IOException {
-        if (listener.isRunning()) {
-            listener.stop();
-        }
-    }
-
-    @Test
-    public void testRequestFlow() throws Exception {
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
FlowResponseMessage());
-        final FlowRequestMessage request = new FlowRequestMessage();
-        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
"localhost", port));
-        final FlowResponseMessage response = sender.requestFlow(request);
-        assertNotNull(response);
-    }
-
-    @Test
-    public void testRequestFlowWithBadResponseMessage() throws Exception {
-
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
-        final FlowRequestMessage request = new FlowRequestMessage();
-        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
"localhost", port));
-        try {
-            sender.requestFlow(request);
-            fail("failed to throw exception");
-        } catch (final ProtocolException pe) {
-        }
-
-    }
-
-    @Test
-    public void testRequestFlowDelayedResponse() throws Exception {
-
-        final int time = 250;
-        sender.getSocketConfiguration().setSocketTimeout(time);
-
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<FlowResponseMessage>() {
-            @Override
-            public FlowResponseMessage answer(final InvocationOnMock 
invocation) throws Throwable {
-                Thread.sleep(time * 3);
-                return new FlowResponseMessage();
-            }
-        });
-        final FlowRequestMessage request = new FlowRequestMessage();
-        request.setNodeId(new NodeIdentifier("id", "api-address", 1, 
"localhost", port));
-        try {
-            sender.requestFlow(request);
-            fail("failed to throw exception");
-        } catch (final ProtocolException pe) {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
deleted file mode 100644
index ea40150..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import org.mockito.stubbing.OngoingStubbing;
-
-public class ClusterServiceLocatorTest {
-
-    private ClusterServiceDiscovery mockServiceDiscovery;
-
-    private int fixedPort;
-
-    private DiscoverableService fixedService;
-
-    private ClusterServiceLocator serviceDiscoveryLocator;
-
-    private ClusterServiceLocator serviceDiscoveryFixedPortLocator;
-
-    private ClusterServiceLocator fixedServiceLocator;
-
-    @Before
-    public void setup() throws Exception {
-
-        fixedPort = 1;
-        mockServiceDiscovery = mock(ClusterServiceDiscovery.class);
-        fixedService = new DiscoverableServiceImpl("some-service", 
InetSocketAddress.createUnresolved("some-host", 20));
-
-        serviceDiscoveryLocator = new 
ClusterServiceLocator(mockServiceDiscovery);
-        serviceDiscoveryFixedPortLocator = new 
ClusterServiceLocator(mockServiceDiscovery, fixedPort);
-        fixedServiceLocator = new ClusterServiceLocator(fixedService);
-
-    }
-
-    @Test
-    public void getServiceWhenServiceDiscoveryNotStarted() {
-        assertNull(serviceDiscoveryLocator.getService());
-    }
-
-    @Test
-    public void getServiceWhenServiceDiscoveryFixedPortNotStarted() {
-        assertNull(serviceDiscoveryLocator.getService());
-    }
-
-    @Test
-    public void getServiceWhenFixedServiceNotStarted() {
-        assertEquals(fixedService, fixedServiceLocator.getService());
-    }
-
-    @Test
-    public void getServiceNotOnFirstAttempt() {
-
-        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
-        config.setNumAttempts(2);
-        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
-        config.setTimeBetweenAttempts(1);
-
-        serviceDiscoveryLocator.setAttemptsConfig(config);
-
-        OngoingStubbing<DiscoverableService> stubbing = null;
-        for (int i = 0; i < config.getNumAttempts() - 1; i++) {
-            if (stubbing == null) {
-                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
-            } else {
-                stubbing.thenReturn(null);
-            }
-        }
-        stubbing.thenReturn(fixedService);
-
-        assertEquals(fixedService, serviceDiscoveryLocator.getService());
-
-    }
-
-    @Test
-    public void getServiceNotOnFirstAttemptWithFixedPort() {
-
-        ClusterServiceLocator.AttemptsConfig config = new 
ClusterServiceLocator.AttemptsConfig();
-        config.setNumAttempts(2);
-        config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
-        config.setTimeBetweenAttempts(1);
-
-        serviceDiscoveryFixedPortLocator.setAttemptsConfig(config);
-
-        OngoingStubbing<DiscoverableService> stubbing = null;
-        for (int i = 0; i < config.getNumAttempts() - 1; i++) {
-            if (stubbing == null) {
-                stubbing = 
when(mockServiceDiscovery.getService()).thenReturn(null);
-            } else {
-                stubbing.thenReturn(null);
-            }
-        }
-        stubbing.thenReturn(fixedService);
-
-        InetSocketAddress resultAddress = 
InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(),
 fixedPort);
-        DiscoverableService resultService = new 
DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress);
-        assertEquals(resultService, 
serviceDiscoveryFixedPortLocator.getService());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
deleted file mode 100644
index 0f834fc..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.net.InetSocketAddress;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.junit.After;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class ClusterServicesBroadcasterTest {
-
-    private ClusterServicesBroadcaster broadcaster;
-
-    private MulticastProtocolListener listener;
-
-    private DummyProtocolHandler handler;
-
-    private InetSocketAddress multicastAddress;
-
-    private DiscoverableService broadcastedService;
-
-    private ProtocolContext protocolContext;
-
-    private MulticastConfiguration configuration;
-
-    @Before
-    public void setup() throws Exception {
-
-        broadcastedService = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", 11111));
-
-        multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
-
-        configuration = new MulticastConfiguration();
-
-        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
-        broadcaster = new ClusterServicesBroadcaster(multicastAddress, 
configuration, protocolContext, "500 ms");
-        broadcaster.addService(broadcastedService);
-
-        handler = new DummyProtocolHandler();
-        listener = new MulticastProtocolListener(5, multicastAddress, 
configuration, protocolContext);
-        listener.addHandler(handler);
-    }
-
-    @After
-    public void teardown() {
-
-        if (broadcaster.isRunning()) {
-            broadcaster.stop();
-        }
-
-        try {
-            if (listener.isRunning()) {
-                listener.stop();
-            }
-        } catch (Exception ex) {
-            ex.printStackTrace(System.out);
-        }
-
-    }
-
-    @Test
-    @Ignore
-    public void testBroadcastReceived() throws Exception {
-
-        broadcaster.start();
-        listener.start();
-
-        Thread.sleep(1000);
-
-        listener.stop();
-
-        assertNotNull(handler.getProtocolMessage());
-        assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, 
handler.getProtocolMessage().getType());
-        final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) 
handler.getProtocolMessage();
-        assertEquals(broadcastedService.getServiceName(), 
msg.getServiceName());
-        assertEquals(broadcastedService.getServiceAddress().getHostName(), 
msg.getAddress());
-        assertEquals(broadcastedService.getServiceAddress().getPort(), 
msg.getPort());
-    }
-
-    private class DummyProtocolHandler implements ProtocolHandler {
-
-        private ProtocolMessage protocolMessage;
-
-        @Override
-        public boolean canHandle(ProtocolMessage msg) {
-            return true;
-        }
-
-        @Override
-        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
-            this.protocolMessage = msg;
-            return null;
-        }
-
-        public ProtocolMessage getProtocolMessage() {
-            return protocolMessage;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
deleted file mode 100644
index f5037a8..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastUtils;
-import org.junit.After;
-import static org.junit.Assert.assertEquals;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class MulticastProtocolListenerTest {
-
-    private MulticastProtocolListener listener;
-
-    private MulticastSocket socket;
-
-    private InetSocketAddress address;
-
-    private MulticastConfiguration configuration;
-
-    private ProtocolContext protocolContext;
-
-    @Before
-    public void setup() throws Exception {
-
-        address = new InetSocketAddress("226.1.1.1", 60000);
-        configuration = new MulticastConfiguration();
-
-        protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
-        listener = new MulticastProtocolListener(5, address, configuration, 
protocolContext);
-        listener.start();
-
-        socket = MulticastUtils.createMulticastSocket(address.getPort(), 
configuration);
-    }
-
-    @After
-    public void teardown() throws IOException {
-        try {
-            if (listener.isRunning()) {
-                listener.stop();
-            }
-        } finally {
-            MulticastUtils.closeQuietly(socket);
-        }
-    }
-
-    @Ignore("This test must be reworked.  Requires an active network 
connection")
-    @Test
-    public void testBadRequest() throws Exception {
-        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
-        listener.addHandler(handler);
-        DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address);
-        socket.send(packet);
-        Thread.sleep(250);
-        assertEquals(0, handler.getMessages().size());
-    }
-
-    @Test
-    @Ignore
-    public void testRequest() throws Exception {
-
-        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
-        listener.addHandler(handler);
-
-        ProtocolMessage msg = new PingMessage();
-        MulticastProtocolMessage multicastMsg = new 
MulticastProtocolMessage("some-id", msg);
-
-        // marshal message to output stream
-        ProtocolMessageMarshaller marshaller = 
protocolContext.createMarshaller();
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        marshaller.marshal(multicastMsg, baos);
-        byte[] requestPacketBytes = baos.toByteArray();
-        DatagramPacket packet = new DatagramPacket(requestPacketBytes, 
requestPacketBytes.length, address);
-        socket.send(packet);
-
-        Thread.sleep(250);
-        assertEquals(1, handler.getMessages().size());
-        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
-
-    }
-
-    private class ReflexiveProtocolHandler implements ProtocolHandler {
-
-        private List<ProtocolMessage> messages = new ArrayList<>();
-
-        @Override
-        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
-            messages.add(msg);
-            return msg;
-        }
-
-        @Override
-        public boolean canHandle(ProtocolMessage msg) {
-            return true;
-        }
-
-        public List<ProtocolMessage> getMessages() {
-            return messages;
-        }
-
-    }
-
-    private class DelayedProtocolHandler implements ProtocolHandler {
-
-        private int delay = 0;
-
-        private List<ProtocolMessage> messages = new ArrayList<>();
-
-        public DelayedProtocolHandler(int delay) {
-            this.delay = delay;
-        }
-
-        @Override
-        public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
-            try {
-                messages.add(msg);
-                Thread.sleep(delay);
-                return null;
-            } catch (final InterruptedException ie) {
-                throw new ProtocolException(ie);
-            }
-
-        }
-
-        @Override
-        public boolean canHandle(ProtocolMessage msg) {
-            return true;
-        }
-
-        public List<ProtocolMessage> getMessages() {
-            return messages;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
deleted file mode 100644
index a759b86..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.UUID;
-
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * @author unattributed
- */
-public class NodeProtocolSenderImplTest {
-
-    private SocketProtocolListener listener;
-
-    private NodeProtocolSenderImpl sender;
-
-    private DiscoverableService service;
-
-    private ServerSocketConfiguration serverSocketConfiguration;
-
-    private ClusterServiceLocator mockServiceLocator;
-
-    private ProtocolHandler mockHandler;
-
-    private NodeIdentifier nodeIdentifier;
-
-    @Before
-    public void setup() throws IOException {
-
-        serverSocketConfiguration = new ServerSocketConfiguration();
-
-        mockServiceLocator = mock(ClusterServiceLocator.class);
-        mockHandler = mock(ProtocolHandler.class);
-
-        nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, 
"localhost", 5678);
-
-        ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
-        listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, 
protocolContext);
-        listener.setShutdownListenerSeconds(3);
-        listener.addHandler(mockHandler);
-        listener.start();
-
-        service = new DiscoverableServiceImpl("some-service", new 
InetSocketAddress("localhost", listener.getPort()));
-
-        SocketConfiguration socketConfiguration = new SocketConfiguration();
-        socketConfiguration.setReuseAddress(true);
-        sender = new NodeProtocolSenderImpl(mockServiceLocator, 
socketConfiguration, protocolContext);
-    }
-
-    @After
-    public void teardown() throws IOException {
-        if (listener.isRunning()) {
-            listener.stop();
-        }
-    }
-
-    @Test
-    public void testConnect() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        ConnectionResponseMessage mockMessage = new 
ConnectionResponseMessage();
-        mockMessage.setConnectionResponse(new 
ConnectionResponse(nodeIdentifier,
-                new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], 
new byte[0]), false, null, null, UUID.randomUUID().toString()));
-        
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
-
-        ConnectionRequestMessage request = new ConnectionRequestMessage();
-        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-        ConnectionResponseMessage response = sender.requestConnection(request);
-        assertNotNull(response);
-    }
-
-    @Test(expected = UnknownServiceAddressException.class)
-    public void testConnectNoClusterManagerAddress() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(null);
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
ConnectionResponseMessage());
-
-        ConnectionRequestMessage request = new ConnectionRequestMessage();
-        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
-        sender.requestConnection(request);
-        fail("failed to throw exception");
-    }
-
-    @Test(expected = ProtocolException.class)
-    public void testConnectBadResponse() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new 
PingMessage());
-
-        ConnectionRequestMessage request = new ConnectionRequestMessage();
-        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
-        sender.requestConnection(request);
-        fail("failed to throw exception");
-
-    }
-
-    @Test(expected = ProtocolException.class)
-    public void testConnectDelayedResponse() throws Exception {
-
-        final int time = 250;
-        sender.getSocketConfiguration().setSocketTimeout(time);
-        when(mockServiceLocator.getService()).thenReturn(service);
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new 
Answer<ConnectionResponseMessage>() {
-            @Override
-            public ConnectionResponseMessage answer(InvocationOnMock 
invocation) throws Throwable {
-                Thread.sleep(time * 3);
-                return new ConnectionResponseMessage();
-            }
-        });
-        ConnectionRequestMessage request = new ConnectionRequestMessage();
-        request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
-        sender.requestConnection(request);
-        fail("failed to throw exception");
-
-    }
-
-    @Test
-    public void testHeartbeat() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
-        HeartbeatMessage msg = new HeartbeatMessage();
-        HeartbeatPayload hbPayload = new HeartbeatPayload();
-        Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, 
"localhost", 4), false, false, hbPayload.marshal());
-        msg.setHeartbeat(hb);
-        sender.heartbeat(msg);
-    }
-
-    @Test
-    public void testNotifyControllerStartupFailure() throws Exception {
-
-        when(mockServiceLocator.getService()).thenReturn(service);
-        
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
-        when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
-        ControllerStartupFailureMessage msg = new 
ControllerStartupFailureMessage();
-        msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, 
"some-addr", 1));
-        msg.setExceptionMessage("some exception");
-        sender.notifyControllerStartupFailure(msg);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
deleted file mode 100644
index 7a91c29..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler;
-import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.SocketUtils;
-import org.junit.After;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class SocketProtocolListenerTest {
-
-    private SocketProtocolListener listener;
-
-    private Socket socket;
-
-    private ProtocolMessageMarshaller<ProtocolMessage> marshaller;
-
-    private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;
-
-    @Before
-    public void setup() throws Exception {
-
-        final ProtocolContext protocolContext = new 
JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-        marshaller = protocolContext.createMarshaller();
-        unmarshaller = protocolContext.createUnmarshaller();
-
-        ServerSocketConfiguration configuration = new 
ServerSocketConfiguration();
-        configuration.setSocketTimeout(1000);
-
-        listener = new SocketProtocolListener(5, 0, configuration, 
protocolContext);
-        listener.start();
-
-        int port = listener.getPort();
-
-        SocketConfiguration config = new SocketConfiguration();
-        config.setReuseAddress(true);
-        config.setSocketTimeout(1000);
-        socket = SocketUtils.createSocket(new InetSocketAddress("localhost", 
port), config);
-    }
-
-    @After
-    public void teardown() throws IOException {
-        try {
-            if (listener.isRunning()) {
-                listener.stop();
-            }
-        } finally {
-            SocketUtils.closeQuietly(socket);
-        }
-    }
-
-    @Test
-    public void testBadRequest() throws Exception {
-        DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
-        listener.addHandler(handler);
-        socket.getOutputStream().write(5);
-        Thread.sleep(250);
-        assertEquals(0, handler.getMessages().size());
-    }
-
-    @Test
-    public void testRequest() throws Exception {
-        ProtocolMessage msg = new PingMessage();
-
-        ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
-        listener.addHandler(handler);
-
-        // marshal message to output stream
-        marshaller.marshal(msg, socket.getOutputStream());
-
-        // unmarshall response and return
-        ProtocolMessage response = 
unmarshaller.unmarshal(socket.getInputStream());
-        assertEquals(msg.getType(), response.getType());
-
-        assertEquals(1, handler.getMessages().size());
-        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
-    }
-
-    @Test
-    public void testDelayedRequest() throws Exception {
-        ProtocolMessage msg = new PingMessage();
-
-        DelayedProtocolHandler handler = new DelayedProtocolHandler(2000);
-        listener.addHandler(handler);
-
-        // marshal message to output stream
-        marshaller.marshal(msg, socket.getOutputStream());
-
-        try {
-            socket.getInputStream().read();
-            fail("Socket timeout not received.");
-        } catch (SocketTimeoutException ste) {
-        }
-
-        assertEquals(1, handler.getMessages().size());
-        assertEquals(msg.getType(), handler.getMessages().get(0).getType());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
deleted file mode 100644
index 2f16777..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.testutils;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-
-/**
- * @author unattributed
- */
-public class DelayedProtocolHandler implements ProtocolHandler {
-
-    private int delay = 0;
-    private List<ProtocolMessage> messages = new ArrayList<>();
-
-    public DelayedProtocolHandler(int delay) {
-        this.delay = delay;
-    }
-
-    @Override
-    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
-        try {
-            messages.add(msg);
-            Thread.sleep(delay);
-            return null;
-        } catch (final InterruptedException ie) {
-            throw new ProtocolException(ie);
-        }
-
-    }
-
-    @Override
-    public boolean canHandle(ProtocolMessage msg) {
-        return true;
-    }
-
-    public List<ProtocolMessage> getMessages() {
-        return messages;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7853e3c1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
deleted file mode 100644
index e80f52c..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.testutils;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-
-/**
- * @author unattributed
- */
-public class ReflexiveProtocolHandler implements ProtocolHandler {
-
-    private List<ProtocolMessage> messages = new ArrayList<>();
-
-    @Override
-    public ProtocolMessage handle(ProtocolMessage msg) throws 
ProtocolException {
-        messages.add(msg);
-        return msg;
-    }
-
-    @Override
-    public boolean canHandle(ProtocolMessage msg) {
-        return true;
-    }
-
-    public List<ProtocolMessage> getMessages() {
-        return messages;
-    }
-
-}

Reply via email to