http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml deleted file mode 100644 index 07ea7a4..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml +++ /dev/null @@ -1,110 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<!-- marked as lazy so that cluster protocol beans are not created when applications runs in standalone mode --> -<beans default-lazy-init="true" - xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:util="http://www.springframework.org/schema/util" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd - http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd"> - - <!-- protocol context --> - <bean id="protocolContext" class="org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext"> - <constructor-arg> - <util:constant static-field="org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils.JAXB_CONTEXT"/> - </constructor-arg> - </bean> - - <!-- socket configuration --> - <bean id="protocolSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean"> - <property name="properties" ref="nifiProperties"/> - </bean> - - <!-- server socket configuration --> - <bean id="protocolServerSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.ServerSocketConfigurationFactoryBean"> - <property name="properties" ref="nifiProperties"/> - </bean> - - <!-- multicast configuration --> - <bean id="protocolMulticastConfiguration" class="org.apache.nifi.cluster.protocol.spring.MulticastConfigurationFactoryBean"> - <property name="properties" ref="nifiProperties"/> - </bean> - - <!-- cluster manager protocol sender --> - <bean id="clusterManagerProtocolSender" class="org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl"> - <constructor-arg ref="protocolSocketConfiguration"/> - <constructor-arg ref="protocolContext"/> - <property name="handshakeTimeout"> - <bean factory-bean="nifiProperties" factory-method="getClusterProtocolConnectionHandshakeTimeout"/> - </property> - </bean> - - <!-- cluster manager protocol listener --> - <bean id="clusterManagerProtocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener"> - <constructor-arg index="0"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerProtocolThreads"/> - </constructor-arg> - <constructor-arg index="1"> - <bean factory-bean="nifiProperties" factory-method="getClusterManagerProtocolPort"/> - </constructor-arg> - <constructor-arg ref="protocolServerSocketConfiguration" index="2"/> - <constructor-arg ref="protocolContext" index="3"/> - </bean> - - <!-- cluster manager sender/listener --> - <bean id="clusterManagerProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener"> - <constructor-arg ref="clusterManagerProtocolSender"/> - <constructor-arg ref="clusterManagerProtocolListener"/> - </bean> - - <!-- node protocol sender --> - <bean id="nodeProtocolSender" class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl"> - <constructor-arg ref="clusterManagerProtocolServiceLocator"/> - <constructor-arg ref="protocolSocketConfiguration"/> - <constructor-arg ref="protocolContext"/> - </bean> - - <!-- node protocol listener --> - <bean id="nodeProtocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener"> - <constructor-arg index="0"> - <bean factory-bean="nifiProperties" factory-method="getClusterNodeProtocolThreads"/> - </constructor-arg> - <constructor-arg index="1"> - <bean factory-bean="nifiProperties" factory-method="getClusterNodeProtocolPort"/> - </constructor-arg> - <constructor-arg ref="protocolServerSocketConfiguration" index="2"/> - <constructor-arg ref="protocolContext" index="3"/> - </bean> - - <!-- node sender/listener --> - <bean id="nodeProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener"> - <constructor-arg ref="nodeProtocolSender"/> - <constructor-arg ref="nodeProtocolListener"/> - </bean> - - <!-- cluster services broadcaster --> - <bean id="clusterServicesBroadcaster" class="org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster"> - <constructor-arg index="0"> - <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/> - </constructor-arg> - <constructor-arg ref="protocolMulticastConfiguration" index="1"/> - <constructor-arg ref="protocolContext" index="2"/> - <constructor-arg index="3"> - <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastServiceBroadcastDelay"/> - </constructor-arg> - </bean> - -</beans>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java deleted file mode 100644 index 59837c1..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener; -import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl; -import java.io.IOException; -import java.net.InetAddress; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.ServerSocketConfiguration; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Test; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * @author unattributed - */ -public class ClusterManagerProtocolSenderImplTest { - - private InetAddress address; - - private int port; - - private SocketProtocolListener listener; - - private ClusterManagerProtocolSenderImpl sender; - - private ProtocolHandler mockHandler; - - @Before - public void setup() throws IOException { - - address = InetAddress.getLocalHost(); - ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); - serverSocketConfiguration.setSocketTimeout(2000); - - mockHandler = mock(ProtocolHandler.class); - - ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); - listener.addHandler(mockHandler); - listener.start(); - - port = listener.getPort(); - - SocketConfiguration socketConfiguration = new SocketConfiguration(); - sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext); - } - - @After - public void teardown() throws IOException { - if(listener.isRunning()) { - listener.stop(); - } - } - - @Test - public void testRequestFlow() throws Exception { - - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage()); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); - FlowResponseMessage response = sender.requestFlow(request); - assertNotNull(response); - } - - @Test - public void testRequestFlowWithBadResponseMessage() throws Exception { - - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); - try { - sender.requestFlow(request); - fail("failed to throw exception"); - } catch(ProtocolException pe) {} - - } - - @Test - public void testRequestFlowDelayedResponse() throws Exception { - - final int time = 250; - sender.getSocketConfiguration().setSocketTimeout(time); - - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() { - @Override - public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(time * 3); - return new FlowResponseMessage(); - } - }); - FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); - try { - sender.requestFlow(request); - fail("failed to throw exception"); - } catch(ProtocolException pe) {} - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java deleted file mode 100644 index e3703e2..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastUtils; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * @author unattributed - */ -public class ClusterServiceDiscoveryTest { - - private ClusterServiceDiscovery discovery; - - private String serviceName; - - private MulticastSocket socket; - - private InetSocketAddress multicastAddress; - - private MulticastConfiguration configuration; - - private ProtocolContext protocolContext; - - @Before - public void setup() throws Exception { - - serviceName = "some-service"; - multicastAddress = new InetSocketAddress("225.1.1.1", 22222); - configuration = new MulticastConfiguration(); - - protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - discovery = new ClusterServiceDiscovery(serviceName, multicastAddress, configuration, protocolContext); - discovery.start(); - - socket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration); - } - - @After - public void teardown() throws IOException { - try { - if(discovery.isRunning()) { - discovery.stop(); - } - } finally { - MulticastUtils.closeQuietly(socket); - } - } - - @Ignore("Test needs to be fixed. Requires an active network connection") - @Test - public void testGetAddressOnStartup() { - assertNull(discovery.getService()); - } - - @Ignore("This test has an NPE after ignoring another...perhaps has a bad inter-test dependency") - @Test - public void testGetAddressAfterBroadcast() throws Exception { - - ServiceBroadcastMessage msg = new ServiceBroadcastMessage(); - msg.setServiceName("some-service"); - msg.setAddress("3.3.3.3"); - msg.setPort(1234); - - // marshal message to output stream - ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(msg, baos); - byte[] requestPacketBytes = baos.toByteArray(); - DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, multicastAddress); - socket.send(packet); - - Thread.sleep(250); - - InetSocketAddress updatedAddress = discovery.getService().getServiceAddress(); - assertEquals("some-service", discovery.getServiceName()); - assertEquals("3.3.3.3", updatedAddress.getHostName()); - assertEquals(1234, updatedAddress.getPort()); - - } - - @Ignore("Test needs to be fixed. Requires an active network connection") - @Test - public void testBadBroadcastMessage() throws Exception { - - ProtocolMessage msg = new PingMessage(); - - // marshal message to output stream - ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(msg, baos); - byte[] requestPacketBytes = baos.toByteArray(); - DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, multicastAddress); - socket.send(packet); - - Thread.sleep(250); - - assertNull(discovery.getService()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java deleted file mode 100644 index b1c156b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator; -import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Test; -import static org.mockito.Mockito.*; -import org.mockito.stubbing.OngoingStubbing; - -public class ClusterServiceLocatorTest { - - private ClusterServiceDiscovery mockServiceDiscovery; - - private int fixedPort; - - private DiscoverableService fixedService; - - private ClusterServiceLocator serviceDiscoveryLocator; - - private ClusterServiceLocator serviceDiscoveryFixedPortLocator; - - private ClusterServiceLocator fixedServiceLocator; - - @Before - public void setup() throws Exception { - - fixedPort = 1; - mockServiceDiscovery = mock(ClusterServiceDiscovery.class); - fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20)); - - serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery); - serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort); - fixedServiceLocator = new ClusterServiceLocator(fixedService); - - } - - @Test - public void getServiceWhenServiceDiscoveryNotStarted() { - assertNull(serviceDiscoveryLocator.getService()); - } - - @Test - public void getServiceWhenServiceDiscoveryFixedPortNotStarted() { - assertNull(serviceDiscoveryLocator.getService()); - } - - @Test - public void getServiceWhenFixedServiceNotStarted() { - assertEquals(fixedService, fixedServiceLocator.getService()); - } - - @Test - public void getServiceNotOnFirstAttempt() { - - ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); - config.setNumAttempts(2); - config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); - config.setTimeBetweenAttempts(1); - - serviceDiscoveryLocator.setAttemptsConfig(config); - - OngoingStubbing<DiscoverableService> stubbing = null; - for(int i = 0; i < config.getNumAttempts() - 1; i++) { - if(stubbing == null) { - stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); - } else { - stubbing.thenReturn(null); - } - } - stubbing.thenReturn(fixedService); - - assertEquals(fixedService, serviceDiscoveryLocator.getService()); - - } - - @Test - public void getServiceNotOnFirstAttemptWithFixedPort() { - - ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); - config.setNumAttempts(2); - config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); - config.setTimeBetweenAttempts(1); - - serviceDiscoveryFixedPortLocator.setAttemptsConfig(config); - - OngoingStubbing<DiscoverableService> stubbing = null; - for(int i = 0; i < config.getNumAttempts() - 1; i++) { - if(stubbing == null) { - stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); - } else { - stubbing.thenReturn(null); - } - } - stubbing.thenReturn(fixedService); - - InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort); - DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress); - assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java deleted file mode 100644 index ec1f26d..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; -import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener; -import java.net.InetSocketAddress; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * @author unattributed - */ -public class ClusterServicesBroadcasterTest { - - private ClusterServicesBroadcaster broadcaster; - - private MulticastProtocolListener listener; - - private DummyProtocolHandler handler; - - private InetSocketAddress multicastAddress; - - private DiscoverableService broadcastedService; - - private ProtocolContext protocolContext; - - private MulticastConfiguration configuration; - - @Before - public void setup() throws Exception { - - broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111)); - - multicastAddress = new InetSocketAddress("225.1.1.1", 22222); - - configuration = new MulticastConfiguration(); - - protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms"); - broadcaster.addService(broadcastedService); - - handler = new DummyProtocolHandler(); - listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext); - listener.addHandler(handler); - } - - @After - public void teardown() { - - if(broadcaster.isRunning()) { - broadcaster.stop(); - } - - try { - if(listener.isRunning()) { - listener.stop(); - } - } catch(Exception ex) { - ex.printStackTrace(System.out); - } - - } - - @Ignore("fails needs to be fixed") - @Test - public void testBroadcastReceived() throws Exception { - - broadcaster.start(); - listener.start(); - - Thread.sleep(1000); - - listener.stop(); - - assertNotNull(handler.getProtocolMessage()); - assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType()); - final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage(); - assertEquals(broadcastedService.getServiceName(), msg.getServiceName()); - assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress()); - assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort()); - } - - private class DummyProtocolHandler implements ProtocolHandler { - - private ProtocolMessage protocolMessage; - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - this.protocolMessage = msg; - return null; - } - - public ProtocolMessage getProtocolMessage() { - return protocolMessage; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java deleted file mode 100644 index 4233d88..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastUtils; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -/** - * @author unattributed - */ -public class MulticastProtocolListenerTest { - - private MulticastProtocolListener listener; - - private MulticastSocket socket; - - private InetSocketAddress address; - - private MulticastConfiguration configuration; - - private ProtocolContext protocolContext; - - @Before - public void setup() throws Exception { - - address = new InetSocketAddress("226.1.1.1", 60000); - configuration = new MulticastConfiguration(); - - protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - listener = new MulticastProtocolListener(5, address, configuration, protocolContext); - listener.start(); - - socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration); - } - - @After - public void teardown() throws IOException { - try { - if(listener.isRunning()) { - listener.stop(); - } - } finally { - MulticastUtils.closeQuietly(socket); - } - } - - @Ignore("Test needs to be reworked. Fails if on a system without actiev network connection") - @Test - public void testBadRequest() throws Exception { - DelayedProtocolHandler handler = new DelayedProtocolHandler(0); - listener.addHandler(handler); - DatagramPacket packet = new DatagramPacket(new byte[] {5}, 1, address); - socket.send(packet); - Thread.sleep(250); - assertEquals(0, handler.getMessages().size()); - } - - @Ignore("this test works sometimes and fails others - needs work to be reliable") - @Test - public void testRequest() throws Exception { - - ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); - listener.addHandler(handler); - - ProtocolMessage msg = new PingMessage(); - MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg); - - // marshal message to output stream - ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(multicastMsg, baos); - byte[] requestPacketBytes = baos.toByteArray(); - DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address); - socket.send(packet); - - Thread.sleep(250); - assertEquals(1, handler.getMessages().size()); - assertEquals(msg.getType(), handler.getMessages().get(0).getType()); - - } - - private class ReflexiveProtocolHandler implements ProtocolHandler { - - private List<ProtocolMessage> messages = new ArrayList<>(); - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - messages.add(msg); - return msg; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } - - } - - private class DelayedProtocolHandler implements ProtocolHandler { - - private int delay = 0; - - private List<ProtocolMessage> messages = new ArrayList<>(); - - public DelayedProtocolHandler(int delay) { - this.delay = delay; - } - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - try { - messages.add(msg); - Thread.sleep(delay); - return null; - } catch(final InterruptedException ie) { - throw new ProtocolException(ie); - } - - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java deleted file mode 100644 index 1c5ba9e..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator; -import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener; -import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.UUID; - -import org.apache.nifi.cluster.protocol.ConnectionRequest; -import org.apache.nifi.cluster.protocol.ConnectionResponse; -import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.StandardDataFlow; -import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.ServerSocketConfiguration; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; - -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * @author unattributed - */ -@Ignore("Randomly tests... probably timing-specific") -public class NodeProtocolSenderImplTest { - - private SocketProtocolListener listener; - - private NodeProtocolSenderImpl sender; - - private DiscoverableService service; - - private ServerSocketConfiguration serverSocketConfiguration; - - private ClusterServiceLocator mockServiceLocator; - - private ProtocolHandler mockHandler; - - private NodeIdentifier nodeIdentifier; - - @Before - public void setup() throws IOException { - - serverSocketConfiguration = new ServerSocketConfiguration(); - - mockServiceLocator = mock(ClusterServiceLocator.class); - mockHandler = mock(ProtocolHandler.class); - - nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678); - - ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); - - listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); - listener.setShutdownListenerSeconds(3); - listener.addHandler(mockHandler); - listener.start(); - - service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort())); - - SocketConfiguration socketConfiguration = new SocketConfiguration(); - socketConfiguration.setReuseAddress(true); - sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext); - } - - @After - public void teardown() throws IOException { - if(listener.isRunning()) { - listener.stop(); - } - } - - @Test - public void testConnect() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - ConnectionResponseMessage mockMessage = new ConnectionResponseMessage(); - mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString())); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage); - - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - ConnectionResponseMessage response = sender.requestConnection(request); - assertNotNull(response); - } - - @Test(expected = UnknownServiceAddressException.class) - public void testConnectNoClusterManagerAddress() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(null); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage()); - - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - - sender.requestConnection(request); - fail("failed to throw exception"); - } - - @Test(expected = ProtocolException.class) - public void testConnectBadResponse() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); - - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - - sender.requestConnection(request); - fail("failed to throw exception"); - - } - - @Test(expected = ProtocolException.class) - public void testConnectDelayedResponse() throws Exception { - - final int time = 250; - sender.getSocketConfiguration().setSocketTimeout(time); - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() { - @Override - public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(time * 3); - return new ConnectionResponseMessage(); - } - }); - ConnectionRequestMessage request = new ConnectionRequestMessage(); - request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); - - sender.requestConnection(request); - fail("failed to throw exception"); - - } - - @Test - public void testHeartbeat() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); - - HeartbeatMessage hb = new HeartbeatMessage(); - hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, new byte[] {1, 2, 3})); - sender.heartbeat(hb); - } - - @Test - public void testNotifyControllerStartupFailure() throws Exception { - - when(mockServiceLocator.getService()).thenReturn(service); - when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); - when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); - - ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage(); - msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1)); - msg.setExceptionMessage("some exception"); - sender.notifyControllerStartupFailure(msg); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java deleted file mode 100644 index 07ee83a..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl.testutils; - -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * @author unattributed - */ -public class DelayedProtocolHandler implements ProtocolHandler { - - private int delay = 0; - private List<ProtocolMessage> messages = new ArrayList<>(); - - public DelayedProtocolHandler(int delay) { - this.delay = delay; - } - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - try { - messages.add(msg); - Thread.sleep(delay); - return null; - } catch (final InterruptedException ie) { - throw new ProtocolException(ie); - } - - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java deleted file mode 100644 index 4e3b932..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl.testutils; - -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * @author unattributed - */ -public class ReflexiveProtocolHandler implements ProtocolHandler { - - private List<ProtocolMessage> messages = new ArrayList<>(); - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - messages.add(msg); - return msg; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/.gitignore ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml deleted file mode 100644 index 18e4ba4..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/pom.xml +++ /dev/null @@ -1,48 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework</artifactId> - <version>0.0.3-incubating-SNAPSHOT</version> - </parent> - <artifactId>nifi-framework-cluster-web</artifactId> - <packaging>jar</packaging> - <description>The clustering software for communicating with the NiFi web api.</description> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-web-optimistic-locking</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-administration</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-user-actions</artifactId> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java deleted file mode 100644 index 44fb25a..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -import java.io.Serializable; -import java.util.List; -import org.apache.nifi.action.Action; -import org.apache.nifi.web.Revision; - -/** - * Contains contextual information about clustering that may be serialized - * between manager and node when communicating over HTTP. - */ -public interface ClusterContext extends Serializable { - - /** - * Returns a list of auditable actions. The list is modifiable - * and will never be null. - * @return a collection of actions - */ - List<Action> getActions(); - - Revision getRevision(); - - void setRevision(Revision revision); - - /** - * @return true if the request was sent by the cluster manager; false otherwise - */ - boolean isRequestSentByClusterManager(); - - /** - * Sets the flag to indicate if a request was sent by the cluster manager. - * @param flag true if the request was sent by the cluster manager; false otherwise - */ - void setRequestSentByClusterManager(boolean flag); - - /** - * Gets an id generation seed. This is used to ensure that nodes are able to generate the - * same id across the cluster. This is usually handled by the cluster manager creating the - * id, however for some actions (snippets, templates, etc) this is not possible. - * @return - */ - String getIdGenerationSeed(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java deleted file mode 100644 index 06907d2..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import org.apache.nifi.action.Action; -import org.apache.nifi.web.Revision; - -/** - * A basic implementation of the context. - */ -public class ClusterContextImpl implements ClusterContext, Serializable { - - private final List<Action> actions = new ArrayList<>(); - - private Revision revision; - - private boolean requestSentByClusterManager; - - private final String idGenerationSeed = UUID.randomUUID().toString(); - - @Override - public List<Action> getActions() { - return actions; - } - - @Override - public Revision getRevision() { - return revision; - } - - @Override - public void setRevision(Revision revision) { - this.revision = revision; - } - - @Override - public boolean isRequestSentByClusterManager() { - return requestSentByClusterManager; - } - - @Override - public void setRequestSentByClusterManager(boolean requestSentByClusterManager) { - this.requestSentByClusterManager = requestSentByClusterManager; - } - - @Override - public String getIdGenerationSeed() { - return this.idGenerationSeed; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java deleted file mode 100644 index 012e7c7..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -/** - * Manages a cluster context on a threadlocal. - */ -public class ClusterContextThreadLocal { - - private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>(); - - public static void removeContext() { - contextHolder.remove(); - } - - public static ClusterContext createEmptyContext() { - return new ClusterContextImpl(); - } - - public static ClusterContext getContext() { - ClusterContext ctx = contextHolder.get(); - if(ctx == null) { - ctx = createEmptyContext(); - contextHolder.set(ctx); - } - return ctx; - } - - public static void setContext(final ClusterContext context) { - contextHolder.set(context); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java deleted file mode 100644 index 90b8a37..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.web; - -import org.apache.nifi.cluster.context.ClusterContext; -import org.apache.nifi.cluster.context.ClusterContextThreadLocal; - -/** - * An optimistic locking manager that provides for optimistic locking in a clustered - * environment. - * - * @author unattributed - */ -public class ClusterAwareOptimisticLockingManager implements OptimisticLockingManager { - - private final OptimisticLockingManager optimisticLockingManager; - - public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager optimisticLockingManager) { - this.optimisticLockingManager = optimisticLockingManager; - } - - @Override - public Revision checkRevision(Revision revision) throws InvalidRevisionException { - final Revision currentRevision = getRevision(); - if(currentRevision.equals(revision) == false) { - throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision)); - } else { - return revision.increment(revision.getClientId()); - } - } - - @Override - public boolean isCurrent(Revision revision) { - return getRevision().equals(revision); - } - - @Override - public Revision getRevision() { - final ClusterContext ctx = ClusterContextThreadLocal.getContext(); - if(ctx == null || ctx.getRevision() == null) { - return optimisticLockingManager.getRevision(); - } else { - return ctx.getRevision(); - } - } - - @Override - public void setRevision(final Revision revision) { - final ClusterContext ctx = ClusterContextThreadLocal.getContext(); - if(ctx != null) { - ctx.setRevision(revision); - } - optimisticLockingManager.setRevision(revision); - } - - @Override - public Revision incrementRevision() { - final Revision currentRevision = getRevision(); - final Revision incRevision = currentRevision.increment(); - setRevision(incRevision); - return incRevision; - } - - @Override - public Revision incrementRevision(final String clientId) { - final Revision currentRevision = getRevision(); - final Revision incRevision = currentRevision.increment(clientId); - setRevision(incRevision); - return incRevision; - } - - @Override - public String getLastModifier() { - return optimisticLockingManager.getLastModifier(); - } - - @Override - public void setLastModifier(final String lastModifier) { - optimisticLockingManager.setLastModifier(lastModifier); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/.gitignore ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml deleted file mode 100644 index dd24804..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml +++ /dev/null @@ -1,130 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework</artifactId> - <version>0.0.3-incubating-SNAPSHOT</version> - </parent> - <artifactId>nifi-framework-cluster</artifactId> - <packaging>jar</packaging> - <description>The clustering software for NiFi.</description> - <dependencies> - <!-- application core dependencies --> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-logging-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-client-dto</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-core-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-cluster-protocol</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-cluster-web</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-web-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-administration</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-site-to-site</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-site-to-site-client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </dependency> - <!-- third party dependencies --> - - <!-- sun dependencies --> - <dependency> - <groupId>javax.servlet</groupId> - <artifactId>javax.servlet-api</artifactId> - </dependency> - - <!-- commons dependencies --> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> - <groupId>commons-net</groupId> - <artifactId>commons-net</artifactId> - </dependency> - - <!-- jersey dependencies --> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - </dependency> - - <!-- spring dependencies --> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java deleted file mode 100644 index 0b70c61..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.client; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple multicast test client that sends ping messages to a group address. - */ -public class MulticastTestClient { - - private static final Logger logger = LoggerFactory.getLogger(MulticastTestClient.class); - - private static final int PING_DELAY_SECONDS = 3; - - public static void main(final String... args) throws IOException { - - String group = System.getProperty("group", "225.0.0.0"); - if (group == null) { - System.out.println("Host system property 'group' was not given."); - return; - } - group = group.trim(); - if (group.length() == 0) { - System.out.println("Host system property 'group' must be non-empty."); - return; - } - - final String portStr = System.getProperty("port", "2222"); - final int port; - try { - port = Integer.parseInt(portStr); - } catch (final NumberFormatException nfe) { - System.out.println("Port system property 'port' was not a valid port."); - return; - } - - logger.info(String.format("Pinging every %s seconds using multicast address: %s:%s.", PING_DELAY_SECONDS, group, port)); - logger.info("Override defaults by using system properties '-Dgroup=<Class D IP>' and '-Dport=<unused port>'."); - logger.info("The test client may be stopped by entering a newline at the command line."); - - final InetSocketAddress addr = new InetSocketAddress(group, port); - final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT); - final MulticastConfiguration multicastConfig = new MulticastConfiguration(); - multicastConfig.setReuseAddress(true); - - // setup listener - final MulticastProtocolListener listener = new MulticastProtocolListener(1, addr, multicastConfig, protocolContext); - listener.addHandler(new ProtocolHandler() { - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - final PingMessage pingMsg = (PingMessage) msg; - final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss", Locale.US); - logger.info("Pinged at: " + sdf.format(pingMsg.getDate())); - return null; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - }); - - // setup socket - final MulticastSocket multicastSocket = MulticastUtils.createMulticastSocket(multicastConfig); - - // setup broadcaster - final Timer broadcaster = new Timer("Multicast Test Client", /** - * is daemon * - */ - true); - - try { - - // start listening - listener.start(); - - // start broadcasting - broadcaster.schedule(new TimerTask() { - - @Override - public void run() { - try { - - final PingMessage msg = new PingMessage(); - msg.setDate(new Date()); - - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(msg, baos); - final byte[] packetBytes = baos.toByteArray(); - - // send message - final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, addr); - multicastSocket.send(packet); - - } catch (final Exception ex) { - logger.warn("Failed to send message due to: " + ex, ex); - } - } - }, 0, PING_DELAY_SECONDS * 1000); - - // block until any input is received - System.in.read(); - - } finally { - broadcaster.cancel(); - if (listener.isRunning()) { - listener.stop(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java deleted file mode 100644 index 6bc5d6c..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.event; - -import java.util.Date; -import org.apache.commons.lang3.StringUtils; - -/** - * Events describe the occurrence of something noteworthy. They record the - * event's source, a timestamp, a description, and a category. - * - * @author unattributed - * - * @Immutable - */ -public class Event { - - public static enum Category { - - DEBUG, - INFO, - WARN - } - - private final String source; - - private final long timestamp; - - private final Category category; - - private final String message; - - /** - * Creates an event with the current time as the timestamp and a category of - * "INFO". - * - * @param source the source - * @param message the description - */ - public Event(final String source, final String message) { - this(source, message, Category.INFO); - } - - /** - * Creates an event with the current time as the timestamp. - * - * @param source the source - * @param message the description - * @param category the event category - */ - public Event(final String source, final String message, final Category category) { - this(source, message, category, new Date().getTime()); - } - - /** - * Creates an event with the a category of "INFO". - * - * @param source the source - * @param message the description - * @param timestamp the time of occurrence - */ - public Event(final String source, final String message, final long timestamp) { - this(source, message, Category.INFO, timestamp); - } - - /** - * Creates an event. - * - * @param source the source - * @param message the description - * @param category the event category - * @param timestamp the time of occurrence - */ - public Event(final String source, final String message, final Category category, final long timestamp) { - - if (StringUtils.isBlank(source)) { - throw new IllegalArgumentException("Source may not be empty or null."); - } else if (StringUtils.isBlank(message)) { - throw new IllegalArgumentException("Event message may not be empty or null."); - } else if (category == null) { - throw new IllegalArgumentException("Event category may not be null."); - } else if (timestamp < 0) { - throw new IllegalArgumentException("Timestamp may not be negative: " + timestamp); - } - - this.source = source; - this.message = message; - this.category = category; - this.timestamp = timestamp; - } - - public Category getCategory() { - return category; - } - - public String getMessage() { - return message; - } - - public String getSource() { - return source; - } - - public long getTimestamp() { - return timestamp; - } - -}